You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/11/22 03:22:55 UTC
(pulsar) branch branch-3.1 updated: [improve][broker] Support not retaining null-key message during topic compaction (#21578) (#21601)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 6ddf83d2131 [improve][broker] Support not retaining null-key message during topic compaction (#21578) (#21601)
6ddf83d2131 is described below
commit 6ddf83d2131194dcf3cccb2299a0f00a497fd5a9
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Wed Nov 22 11:22:48 2023 +0800
[improve][broker] Support not retaining null-key message during topic compaction (#21578) (#21601)
---
conf/broker.conf | 3 ++
conf/standalone.conf | 5 +-
.../apache/pulsar/broker/ServiceConfiguration.java | 6 +++
.../pulsar/client/impl/RawBatchConverter.java | 19 ++++++--
.../pulsar/compaction/TwoPhaseCompactor.java | 32 ++++++++++---
.../apache/pulsar/compaction/CompactionTest.java | 56 ++++++++++++----------
6 files changed, 84 insertions(+), 37 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 4ad8536fd8d..bc9b644b221 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -538,6 +538,9 @@ brokerServiceCompactionThresholdInBytes=0
# If the execution time of the compaction phase one loop exceeds this time, the compaction will not proceed.
brokerServiceCompactionPhaseOneLoopTimeInSeconds=30
+# Whether retain null-key message during topic compaction
+topicCompactionRemainNullKey=true
+
# Whether to enable the delayed delivery for messages.
# If disabled, messages will be immediately delivered and there will
# be no tracking overhead.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 76223c5933e..b730bbc1290 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -1277,4 +1277,7 @@ brokerInterceptorsDirectory=./interceptors
brokerInterceptors=
# Enable or disable the broker interceptor, which is only used for testing for now
-disableBrokerInterceptors=true
\ No newline at end of file
+disableBrokerInterceptors=true
+
+# Whether retain null-key message during topic compaction
+topicCompactionRemainNullKey=true
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index fb267755913..912182ceba7 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2777,6 +2777,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Whether retain null-key message during topic compaction."
+ )
+ private boolean topicCompactionRemainNullKey = true;
+
@FieldContext(
category = CATEGORY_SERVER,
doc = "Interval between checks to see if cluster is migrated and marks topic migrated "
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index 167cc1b699c..b6be228788a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -92,6 +92,11 @@ public class RawBatchConverter {
return idsAndKeysAndSize;
}
+ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
+ BiPredicate<String, MessageId> filter) throws IOException {
+ return rebatchMessage(msg, filter, true);
+ }
+
/**
* Take a batched message and a filter, and returns a message with the only the sub-messages
* which match the filter. Returns an empty optional if no messages match.
@@ -99,7 +104,8 @@ public class RawBatchConverter {
* NOTE: this message does not alter the reference count of the RawMessage argument.
*/
public static Optional<RawMessage> rebatchMessage(RawMessage msg,
- BiPredicate<String, MessageId> filter)
+ BiPredicate<String, MessageId> filter,
+ boolean retainNullKey)
throws IOException {
checkArgument(msg.getMessageIdData().getBatchIndex() == -1);
@@ -135,9 +141,14 @@ public class RawBatchConverter {
msg.getMessageIdData().getPartition(),
i);
if (!singleMessageMetadata.hasPartitionKey()) {
- messagesRetained++;
- Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
- singleMessagePayload, batchBuffer);
+ if (retainNullKey) {
+ messagesRetained++;
+ Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
+ singleMessagePayload, batchBuffer);
+ } else {
+ Commands.serializeSingleMessageInBatchWithPayload(emptyMetadata,
+ Unpooled.EMPTY_BUFFER, batchBuffer);
+ }
} else if (filter.test(singleMessageMetadata.getPartitionKey(), id)
&& singleMessagePayload.readableBytes() > 0) {
messagesRetained++;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index e4e067ad611..cb39cc93154 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -62,6 +62,7 @@ public class TwoPhaseCompactor extends Compactor {
private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class);
private static final int MAX_OUTSTANDING = 500;
private final Duration phaseOneLoopReadTimeout;
+ private final boolean topicCompactionRemainNullKey;
public TwoPhaseCompactor(ServiceConfiguration conf,
PulsarClient pulsar,
@@ -69,6 +70,7 @@ public class TwoPhaseCompactor extends Compactor {
ScheduledExecutorService scheduler) {
super(conf, pulsar, bk, scheduler);
phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
+ topicCompactionRemainNullKey = conf.isTopicCompactionRemainNullKey();
}
@Override
@@ -134,6 +136,14 @@ public class TwoPhaseCompactor extends Compactor {
int deleteCnt = 0;
for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(m)) {
if (e != null) {
+ if (e.getMiddle() == null) {
+ if (!topicCompactionRemainNullKey) {
+ // record delete null-key message event
+ deleteCnt++;
+ mxBean.addCompactionRemovedEvent(reader.getTopic());
+ }
+ continue;
+ }
if (e.getRight() > 0) {
MessageId old = latestForKey.put(e.getMiddle(), e.getLeft());
if (old != null) {
@@ -163,6 +173,10 @@ public class TwoPhaseCompactor extends Compactor {
deletedMessage = true;
latestForKey.remove(keyAndSize.getLeft());
}
+ } else {
+ if (!topicCompactionRemainNullKey) {
+ deletedMessage = true;
+ }
}
if (replaceMessage || deletedMessage) {
mxBean.addCompactionRemovedEvent(reader.getTopic());
@@ -249,8 +263,8 @@ public class TwoPhaseCompactor extends Compactor {
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
if (RawBatchConverter.isReadableBatch(m)) {
try {
- messageToAdd = rebatchMessage(
- m, (key, subid) -> subid.equals(latestForKey.get(key)));
+ messageToAdd = rebatchMessage(reader.getTopic(),
+ m, (key, subid) -> subid.equals(latestForKey.get(key)), topicCompactionRemainNullKey);
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole batch will be included in output",
id, ioe);
@@ -259,8 +273,8 @@ public class TwoPhaseCompactor extends Compactor {
} else {
Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
MessageId msg;
- if (keyAndSize == null) { // pass through messages without a key
- messageToAdd = Optional.of(m);
+ if (keyAndSize == null) {
+ messageToAdd = topicCompactionRemainNullKey ? Optional.of(m) : Optional.empty();
} else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
&& msg.equals(id)) { // consider message only if present into latestForKey map
if (keyAndSize.getRight() <= 0) {
@@ -416,12 +430,16 @@ public class TwoPhaseCompactor extends Compactor {
protected List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKeysAndSizeFromBatch(RawMessage msg)
throws IOException {
- return RawBatchConverter.extractIdsAndKeysAndSize(msg, false);
+ return RawBatchConverter.extractIdsAndKeysAndSize(msg);
}
- protected Optional<RawMessage> rebatchMessage(RawMessage msg, BiPredicate<String, MessageId> filter)
+ protected Optional<RawMessage> rebatchMessage(String topic, RawMessage msg, BiPredicate<String, MessageId> filter,
+ boolean retainNullKey)
throws IOException {
- return RawBatchConverter.rebatchMessage(msg, filter);
+ if (log.isDebugEnabled()) {
+ log.debug("Rebatching message {} for topic {}", msg.getMessageId(), topic);
+ }
+ return RawBatchConverter.rebatchMessage(msg, filter, retainNullKey);
}
private static class PhaseOneResult {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 52837cbdcd5..5ee12d660e0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
@@ -640,8 +641,17 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
}
}
- @Test
- public void testKeyLessMessagesPassThrough() throws Exception {
+ @DataProvider(name = "retainNullKey")
+ public static Object[][] retainNullKey() {
+ return new Object[][] {{true}, {false}};
+ }
+
+ @Test(dataProvider = "retainNullKey")
+ public void testKeyLessMessagesPassThrough(boolean retainNullKey) throws Exception {
+ conf.setTopicCompactionRemainNullKey(retainNullKey);
+ restartBroker();
+ FieldUtils.writeDeclaredField(compactor, "topicCompactionRemainNullKey", retainNullKey, true);
+
String topic = "persistent://my-property/use/my-ns/my-topic1";
// subscribe before sending anything, so that we get all messages
@@ -682,29 +692,25 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
assertNull(m);
} else {
- Message<byte[]> message1 = consumer.receive();
- Assert.assertFalse(message1.hasKey());
- Assert.assertEquals(new String(message1.getData()), "my-message-1");
-
- Message<byte[]> message2 = consumer.receive();
- Assert.assertFalse(message2.hasKey());
- Assert.assertEquals(new String(message2.getData()), "my-message-2");
-
- Message<byte[]> message3 = consumer.receive();
- Assert.assertEquals(message3.getKey(), "key1");
- Assert.assertEquals(new String(message3.getData()), "my-message-4");
-
- Message<byte[]> message4 = consumer.receive();
- Assert.assertEquals(message4.getKey(), "key2");
- Assert.assertEquals(new String(message4.getData()), "my-message-6");
-
- Message<byte[]> message5 = consumer.receive();
- Assert.assertFalse(message5.hasKey());
- Assert.assertEquals(new String(message5.getData()), "my-message-7");
+ List<Pair<String, String>> result = new ArrayList<>();
+ while (true) {
+ Message<byte[]> message = consumer.receive(10, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ result.add(Pair.of(message.getKey(), message.getData() == null ? null : new String(message.getData())));
+ }
- Message<byte[]> message6 = consumer.receive();
- Assert.assertFalse(message6.hasKey());
- Assert.assertEquals(new String(message6.getData()), "my-message-8");
+ List<Pair<String, String>> expectList;
+ if (retainNullKey) {
+ expectList = List.of(
+ Pair.of(null, "my-message-1"), Pair.of(null, "my-message-2"),
+ Pair.of("key1", "my-message-4"), Pair.of("key2", "my-message-6"),
+ Pair.of(null, "my-message-7"), Pair.of(null, "my-message-8"));
+ } else {
+ expectList = List.of(Pair.of("key1", "my-message-4"), Pair.of("key2", "my-message-6"));
+ }
+ Assert.assertEquals(result, expectList);
}
}
}
@@ -1885,7 +1891,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
.topic(topicName).create();
for (int i = 0; i < 10; i+=2) {
- producer.newMessage().key(null).value(new byte[4*1024*1024]).send();
+ producer.newMessage().key(UUID.randomUUID().toString()).value(new byte[4*1024*1024]).send();
}
producer.flush();