You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/07/07 05:31:25 UTC

[pulsar] 01/05: Make the compaction phase one loop timeout configurable (#11206)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 16633386930021324387906a14bb81920bdae304
Author: lipenghui <pe...@apache.org>
AuthorDate: Sun Jul 4 08:48:43 2021 +0800

    Make the compaction phase one loop timeout configurable (#11206)
    
    
    (cherry picked from commit 96e4fd69e5376fa2924517b045d02f8229679b92)
---
 conf/broker.conf                                           |  4 ++++
 .../org/apache/pulsar/broker/ServiceConfiguration.java     |  7 +++++++
 .../org/apache/pulsar/compaction/TwoPhaseCompactor.java    |  9 +++++++--
 .../java/org/apache/pulsar/compaction/CompactorTest.java   | 14 ++++++++++++++
 4 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index e8c7877..775f9aa 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -450,6 +450,10 @@ brokerServiceCompactionMonitorIntervalInSeconds=60
 # Using a value of 0, is disabling compression check.
 brokerServiceCompactionThresholdInBytes=0
 
+# Timeout for the compaction phase one loop.
+# If the execution time of the compaction phase one loop exceeds this time, the compaction will not proceed.
+brokerServiceCompactionPhaseOneLoopTimeInSeconds=30
+
 # Whether to enable the delayed delivery for messages.
 # If disabled, messages will be immediately delivered and there will
 # be no tracking overhead.
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 8bd05c2..ebefd59 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1882,6 +1882,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private long brokerServiceCompactionThresholdInBytes = 0;
 
     @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Timeout for the compaction phase one loop, If the execution time of the compaction " +
+                    "phase one loop exceeds this time, the compaction will not proceed."
+    )
+    private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30;
+
+    @FieldContext(
         category = CATEGORY_SCHEMA,
         doc = "Enforce schema validation on following cases:\n\n"
             + " - if a producer without a schema attempts to produce to a topic with schema, the producer will be\n"
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 800182e..0f0f981 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -60,13 +60,14 @@ public class TwoPhaseCompactor extends Compactor {
     private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class);
     private static final int MAX_OUTSTANDING = 500;
     private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger";
-    public static final Duration PHASE_ONE_LOOP_READ_TIMEOUT = Duration.ofSeconds(10);
+    private final Duration phaseOneLoopReadTimeout;
 
     public TwoPhaseCompactor(ServiceConfiguration conf,
                              PulsarClient pulsar,
                              BookKeeper bk,
                              ScheduledExecutorService scheduler) {
         super(conf, pulsar, bk, scheduler);
+        phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
     }
 
     @Override
@@ -116,7 +117,7 @@ public class TwoPhaseCompactor extends Compactor {
         }
         CompletableFuture<RawMessage> future = reader.readNextAsync();
         FutureUtil.addTimeoutHandling(future,
-                PHASE_ONE_LOOP_READ_TIMEOUT, scheduler,
+                phaseOneLoopReadTimeout, scheduler,
                 () -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)"));
 
         future.thenAcceptAsync(m -> {
@@ -399,4 +400,8 @@ public class TwoPhaseCompactor extends Compactor {
             this.latestForKey = latestForKey;
         }
     }
+
+    public long getPhaseOneLoopReadTimeoutInSeconds() {
+        return phaseOneLoopReadTimeout.getSeconds();
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 57a2146..0d1a95c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -39,15 +39,19 @@ import java.util.concurrent.ScheduledExecutorService;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.RawMessageImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -220,6 +224,16 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
         compactor.compact(topic).get();
     }
 
+    @Test
+    public void testPhaseOneLoopTimeConfiguration() {
+        ServiceConfiguration configuration = new ServiceConfiguration();
+        configuration.setBrokerServiceCompactionPhaseOneLoopTimeInSeconds(60);
+        TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration, Mockito.mock(PulsarClientImpl.class),
+                Mockito.mock(BookKeeper.class), compactionScheduler);
+        Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(), 60);
+
+    }
+
     public ByteBuf extractPayload(RawMessage m) throws Exception {
         ByteBuf payloadAndMetadata = m.getHeadersAndPayload();
         Commands.skipChecksumIfPresent(payloadAndMetadata);