You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/07/07 05:31:25 UTC
[pulsar] 01/05: Make the compaction phase one loop timeout
configurable (#11206)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 16633386930021324387906a14bb81920bdae304
Author: lipenghui <pe...@apache.org>
AuthorDate: Sun Jul 4 08:48:43 2021 +0800
Make the compaction phase one loop timeout configurable (#11206)
(cherry picked from commit 96e4fd69e5376fa2924517b045d02f8229679b92)
---
conf/broker.conf | 4 ++++
.../org/apache/pulsar/broker/ServiceConfiguration.java | 7 +++++++
.../org/apache/pulsar/compaction/TwoPhaseCompactor.java | 9 +++++++--
.../java/org/apache/pulsar/compaction/CompactorTest.java | 14 ++++++++++++++
4 files changed, 32 insertions(+), 2 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index e8c7877..775f9aa 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -450,6 +450,10 @@ brokerServiceCompactionMonitorIntervalInSeconds=60
# Using a value of 0, is disabling compression check.
brokerServiceCompactionThresholdInBytes=0
+# Timeout for the compaction phase one loop.
+# If the execution time of the compaction phase one loop exceeds this time, the compaction will not proceed.
+brokerServiceCompactionPhaseOneLoopTimeInSeconds=30
+
# Whether to enable the delayed delivery for messages.
# If disabled, messages will be immediately delivered and there will
# be no tracking overhead.
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 8bd05c2..ebefd59 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1882,6 +1882,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
private long brokerServiceCompactionThresholdInBytes = 0;
@FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Timeout for the compaction phase one loop, If the execution time of the compaction " +
+ "phase one loop exceeds this time, the compaction will not proceed."
+ )
+ private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30;
+
+ @FieldContext(
category = CATEGORY_SCHEMA,
doc = "Enforce schema validation on following cases:\n\n"
+ " - if a producer without a schema attempts to produce to a topic with schema, the producer will be\n"
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 800182e..0f0f981 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -60,13 +60,14 @@ public class TwoPhaseCompactor extends Compactor {
private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class);
private static final int MAX_OUTSTANDING = 500;
private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger";
- public static final Duration PHASE_ONE_LOOP_READ_TIMEOUT = Duration.ofSeconds(10);
+ private final Duration phaseOneLoopReadTimeout;
public TwoPhaseCompactor(ServiceConfiguration conf,
PulsarClient pulsar,
BookKeeper bk,
ScheduledExecutorService scheduler) {
super(conf, pulsar, bk, scheduler);
+ phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
}
@Override
@@ -116,7 +117,7 @@ public class TwoPhaseCompactor extends Compactor {
}
CompletableFuture<RawMessage> future = reader.readNextAsync();
FutureUtil.addTimeoutHandling(future,
- PHASE_ONE_LOOP_READ_TIMEOUT, scheduler,
+ phaseOneLoopReadTimeout, scheduler,
() -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)"));
future.thenAcceptAsync(m -> {
@@ -399,4 +400,8 @@ public class TwoPhaseCompactor extends Compactor {
this.latestForKey = latestForKey;
}
}
+
+ public long getPhaseOneLoopReadTimeoutInSeconds() {
+ return phaseOneLoopReadTimeout.getSeconds();
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 57a2146..0d1a95c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -39,15 +39,19 @@ import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -220,6 +224,16 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
compactor.compact(topic).get();
}
+ @Test
+ public void testPhaseOneLoopTimeConfiguration() {
+ ServiceConfiguration configuration = new ServiceConfiguration();
+ configuration.setBrokerServiceCompactionPhaseOneLoopTimeInSeconds(60);
+ TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration, Mockito.mock(PulsarClientImpl.class),
+ Mockito.mock(BookKeeper.class), compactionScheduler);
+ Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(), 60);
+
+ }
+
public ByteBuf extractPayload(RawMessage m) throws Exception {
ByteBuf payloadAndMetadata = m.getHeadersAndPayload();
Commands.skipChecksumIfPresent(payloadAndMetadata);