You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/11/22 03:22:55 UTC

(pulsar) branch branch-3.1 updated: [improve][broker] Support not retaining null-key message during topic compaction (#21578) (#21601)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 6ddf83d2131 [improve][broker] Support not retaining null-key message during topic compaction (#21578) (#21601)
6ddf83d2131 is described below

commit 6ddf83d2131194dcf3cccb2299a0f00a497fd5a9
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Wed Nov 22 11:22:48 2023 +0800

    [improve][broker] Support not retaining null-key message during topic compaction (#21578) (#21601)
---
 conf/broker.conf                                   |  3 ++
 conf/standalone.conf                               |  5 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 +++
 .../pulsar/client/impl/RawBatchConverter.java      | 19 ++++++--
 .../pulsar/compaction/TwoPhaseCompactor.java       | 32 ++++++++++---
 .../apache/pulsar/compaction/CompactionTest.java   | 56 ++++++++++++----------
 6 files changed, 84 insertions(+), 37 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 4ad8536fd8d..bc9b644b221 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -538,6 +538,9 @@ brokerServiceCompactionThresholdInBytes=0
 # If the execution time of the compaction phase one loop exceeds this time, the compaction will not proceed.
 brokerServiceCompactionPhaseOneLoopTimeInSeconds=30
 
+# Whether retain null-key message during topic compaction
+topicCompactionRemainNullKey=true
+
 # Whether to enable the delayed delivery for messages.
 # If disabled, messages will be immediately delivered and there will
 # be no tracking overhead.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 76223c5933e..b730bbc1290 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -1277,4 +1277,7 @@ brokerInterceptorsDirectory=./interceptors
 brokerInterceptors=
 
 # Enable or disable the broker interceptor, which is only used for testing for now
-disableBrokerInterceptors=true
\ No newline at end of file
+disableBrokerInterceptors=true
+
+# Whether retain null-key message during topic compaction
+topicCompactionRemainNullKey=true
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index fb267755913..912182ceba7 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2777,6 +2777,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30;
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Whether retain null-key message during topic compaction."
+    )
+    private boolean topicCompactionRemainNullKey = true;
+
     @FieldContext(
         category = CATEGORY_SERVER,
         doc = "Interval between checks to see if cluster is migrated and marks topic migrated "
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index 167cc1b699c..b6be228788a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -92,6 +92,11 @@ public class RawBatchConverter {
         return idsAndKeysAndSize;
     }
 
+    public static Optional<RawMessage> rebatchMessage(RawMessage msg,
+                                                      BiPredicate<String, MessageId> filter) throws IOException {
+        return rebatchMessage(msg, filter, true);
+    }
+
     /**
      * Take a batched message and a filter, and returns a message with the only the sub-messages
      * which match the filter. Returns an empty optional if no messages match.
@@ -99,7 +104,8 @@ public class RawBatchConverter {
      *  NOTE: this message does not alter the reference count of the RawMessage argument.
      */
     public static Optional<RawMessage> rebatchMessage(RawMessage msg,
-                                                      BiPredicate<String, MessageId> filter)
+                                                      BiPredicate<String, MessageId> filter,
+                                                      boolean retainNullKey)
             throws IOException {
         checkArgument(msg.getMessageIdData().getBatchIndex() == -1);
 
@@ -135,9 +141,14 @@ public class RawBatchConverter {
                                                       msg.getMessageIdData().getPartition(),
                                                       i);
                 if (!singleMessageMetadata.hasPartitionKey()) {
-                    messagesRetained++;
-                    Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
-                                                                      singleMessagePayload, batchBuffer);
+                    if (retainNullKey) {
+                        messagesRetained++;
+                        Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
+                                singleMessagePayload, batchBuffer);
+                    } else {
+                        Commands.serializeSingleMessageInBatchWithPayload(emptyMetadata,
+                                Unpooled.EMPTY_BUFFER, batchBuffer);
+                    }
                 } else if (filter.test(singleMessageMetadata.getPartitionKey(), id)
                            && singleMessagePayload.readableBytes() > 0) {
                     messagesRetained++;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index e4e067ad611..cb39cc93154 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -62,6 +62,7 @@ public class TwoPhaseCompactor extends Compactor {
     private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class);
     private static final int MAX_OUTSTANDING = 500;
     private final Duration phaseOneLoopReadTimeout;
+    private final boolean topicCompactionRemainNullKey;
 
     public TwoPhaseCompactor(ServiceConfiguration conf,
                              PulsarClient pulsar,
@@ -69,6 +70,7 @@ public class TwoPhaseCompactor extends Compactor {
                              ScheduledExecutorService scheduler) {
         super(conf, pulsar, bk, scheduler);
         phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
+        topicCompactionRemainNullKey = conf.isTopicCompactionRemainNullKey();
     }
 
     @Override
@@ -134,6 +136,14 @@ public class TwoPhaseCompactor extends Compactor {
                         int deleteCnt = 0;
                         for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(m)) {
                             if (e != null) {
+                                if (e.getMiddle() == null) {
+                                    if (!topicCompactionRemainNullKey) {
+                                        // record delete null-key message event
+                                        deleteCnt++;
+                                        mxBean.addCompactionRemovedEvent(reader.getTopic());
+                                    }
+                                    continue;
+                                }
                                 if (e.getRight() > 0) {
                                     MessageId old = latestForKey.put(e.getMiddle(), e.getLeft());
                                     if (old != null) {
@@ -163,6 +173,10 @@ public class TwoPhaseCompactor extends Compactor {
                             deletedMessage = true;
                             latestForKey.remove(keyAndSize.getLeft());
                         }
+                    } else {
+                        if (!topicCompactionRemainNullKey) {
+                            deletedMessage = true;
+                        }
                     }
                     if (replaceMessage || deletedMessage) {
                         mxBean.addCompactionRemovedEvent(reader.getTopic());
@@ -249,8 +263,8 @@ public class TwoPhaseCompactor extends Compactor {
                 mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
                 if (RawBatchConverter.isReadableBatch(m)) {
                     try {
-                        messageToAdd = rebatchMessage(
-                                m, (key, subid) -> subid.equals(latestForKey.get(key)));
+                        messageToAdd = rebatchMessage(reader.getTopic(),
+                                m, (key, subid) -> subid.equals(latestForKey.get(key)), topicCompactionRemainNullKey);
                     } catch (IOException ioe) {
                         log.info("Error decoding batch for message {}. Whole batch will be included in output",
                                 id, ioe);
@@ -259,8 +273,8 @@ public class TwoPhaseCompactor extends Compactor {
                 } else {
                     Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
                     MessageId msg;
-                    if (keyAndSize == null) { // pass through messages without a key
-                        messageToAdd = Optional.of(m);
+                    if (keyAndSize == null) {
+                        messageToAdd = topicCompactionRemainNullKey ? Optional.of(m) : Optional.empty();
                     } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
                             && msg.equals(id)) { // consider message only if present into latestForKey map
                         if (keyAndSize.getRight() <= 0) {
@@ -416,12 +430,16 @@ public class TwoPhaseCompactor extends Compactor {
 
     protected List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKeysAndSizeFromBatch(RawMessage msg)
             throws IOException {
-        return RawBatchConverter.extractIdsAndKeysAndSize(msg, false);
+        return RawBatchConverter.extractIdsAndKeysAndSize(msg);
     }
 
-    protected Optional<RawMessage> rebatchMessage(RawMessage msg, BiPredicate<String, MessageId> filter)
+    protected Optional<RawMessage> rebatchMessage(String topic, RawMessage msg, BiPredicate<String, MessageId> filter,
+                                                  boolean retainNullKey)
             throws IOException {
-        return RawBatchConverter.rebatchMessage(msg, filter);
+        if (log.isDebugEnabled()) {
+            log.debug("Rebatching message {} for topic {}", msg.getMessageId(), topic);
+        }
+        return RawBatchConverter.rebatchMessage(msg, filter, retainNullKey);
     }
 
     private static class PhaseOneResult {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 52837cbdcd5..5ee12d660e0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.buffer.ByteBuf;
@@ -640,8 +641,17 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         }
     }
 
-    @Test
-    public void testKeyLessMessagesPassThrough() throws Exception {
+    @DataProvider(name = "retainNullKey")
+    public static Object[][] retainNullKey() {
+        return new Object[][] {{true}, {false}};
+    }
+
+    @Test(dataProvider = "retainNullKey")
+    public void testKeyLessMessagesPassThrough(boolean retainNullKey) throws Exception {
+        conf.setTopicCompactionRemainNullKey(retainNullKey);
+        restartBroker();
+        FieldUtils.writeDeclaredField(compactor, "topicCompactionRemainNullKey", retainNullKey, true);
+
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
         // subscribe before sending anything, so that we get all messages
@@ -682,29 +692,25 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
                 Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
                 assertNull(m);
             } else {
-                Message<byte[]> message1 = consumer.receive();
-                Assert.assertFalse(message1.hasKey());
-                Assert.assertEquals(new String(message1.getData()), "my-message-1");
-
-                Message<byte[]> message2 = consumer.receive();
-                Assert.assertFalse(message2.hasKey());
-                Assert.assertEquals(new String(message2.getData()), "my-message-2");
-
-                Message<byte[]> message3 = consumer.receive();
-                Assert.assertEquals(message3.getKey(), "key1");
-                Assert.assertEquals(new String(message3.getData()), "my-message-4");
-
-                Message<byte[]> message4 = consumer.receive();
-                Assert.assertEquals(message4.getKey(), "key2");
-                Assert.assertEquals(new String(message4.getData()), "my-message-6");
-
-                Message<byte[]> message5 = consumer.receive();
-                Assert.assertFalse(message5.hasKey());
-                Assert.assertEquals(new String(message5.getData()), "my-message-7");
+                List<Pair<String, String>> result = new ArrayList<>();
+                while (true) {
+                    Message<byte[]> message = consumer.receive(10, TimeUnit.SECONDS);
+                    if (message == null) {
+                        break;
+                    }
+                    result.add(Pair.of(message.getKey(), message.getData() == null ? null : new String(message.getData())));
+                }
 
-                Message<byte[]> message6 = consumer.receive();
-                Assert.assertFalse(message6.hasKey());
-                Assert.assertEquals(new String(message6.getData()), "my-message-8");
+                List<Pair<String, String>> expectList;
+                if (retainNullKey) {
+                    expectList = List.of(
+                        Pair.of(null, "my-message-1"), Pair.of(null, "my-message-2"),
+                        Pair.of("key1", "my-message-4"), Pair.of("key2", "my-message-6"),
+                        Pair.of(null, "my-message-7"), Pair.of(null, "my-message-8"));
+                } else {
+                    expectList = List.of(Pair.of("key1", "my-message-4"), Pair.of("key2", "my-message-6"));
+                }
+                Assert.assertEquals(result, expectList);
             }
         }
     }
@@ -1885,7 +1891,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
                 .topic(topicName).create();
 
         for (int i = 0; i < 10; i+=2) {
-            producer.newMessage().key(null).value(new byte[4*1024*1024]).send();
+            producer.newMessage().key(UUID.randomUUID().toString()).value(new byte[4*1024*1024]).send();
         }
         producer.flush();