You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:33:48 UTC

[pulsar] 02/38: Retention policy should be respected when there is no traffic (#6676)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7f0cd1af01d74074a60ec9c6478adf9d59e63bca
Author: feynmanlin <31...@qq.com>
AuthorDate: Wed Apr 8 10:29:53 2020 +0800

    Retention policy should be respected when there is no traffic (#6676)
    
    Add a new feature:
    Retention policy should be respected when there is no traffic #6655
    
    * Retention policy should be respected when there is no traffic(#6655)
    
    * change parameter name and add annotations
    
    * Reduce the scheduling interval
    
    Co-authored-by: feynmanlin <fe...@tencent.com>
    Co-authored-by: Sijie Guo <si...@apache.org>(cherry picked from commit a658791b4dc696b8ada1fde1a7aa239b251dfda6)
---
 conf/broker.conf                                   |  4 +
 .../apache/bookkeeper/mledger/ManagedLedger.java   |  7 ++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  7 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 ++
 .../pulsar/broker/service/BrokerService.java       | 25 ++++++
 .../broker/service/ConsumedLedgersTrimTest.java    | 93 ++++++++++++++++++++++
 6 files changed, 138 insertions(+), 4 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 212051a..83d009d 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -336,6 +336,10 @@ maxMessagePublishBufferSizeInMB=
 # Use 0 or negative number to disable the max publish buffer limiting.
 messagePublishBufferCheckIntervalInMillis=100
 
+# Check between intervals to see if consumed ledgers need to be trimmed
+# Use 0 or negative number to disable the check
+retentionCheckIntervalInSeconds=120
+
 ### --- Authentication --- ###
 # Role names that are treated as "proxy roles". If the broker sees a request with
 #role as proxyRoles - it will demand to see a valid original principal.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index bedb81d..50439f5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.Beta;
 import io.netty.buffer.ByteBuf;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -449,4 +450,10 @@ public interface ManagedLedger {
      * Signaling managed ledger that we can resume after BK write failure
      */
     void readyToCreateNewLedger();
+
+    /**
+     * Trim consumed ledgers in background
+     * @param promise
+     */
+    void trimConsumedLedgersInBackground(CompletableFuture<?> promise);
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 644ae34..08b5e5e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1841,10 +1841,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
     }
 
-    private void trimConsumedLedgersInBackground(CompletableFuture<?> promise) {
-        executor.executeOrdered(name, safeRun(() -> {
-            internalTrimConsumedLedgers(promise);
-        }));
+    @Override
+    public void trimConsumedLedgersInBackground(CompletableFuture<?> promise) {
+        executor.executeOrdered(name, safeRun(() -> internalTrimConsumedLedgers(promise)));
     }
 
     private void scheduleDeferredTrimming(CompletableFuture<?> promise) {
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 19a4337..d7d0d63 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -621,6 +621,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private int messagePublishBufferCheckIntervalInMillis = 100;
 
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "Check between intervals to see if consumed ledgers need to be trimmed"
+    )
+    private int retentionCheckIntervalInSeconds = 120;
+
     /**** --- Messaging Protocols --- ****/
 
     @FieldContext(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b1ee242..085bae4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -78,6 +78,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
@@ -189,6 +190,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     private final ScheduledExecutorService messageExpiryMonitor;
     private final ScheduledExecutorService compactionMonitor;
     private final ScheduledExecutorService messagePublishBufferMonitor;
+    private final ScheduledExecutorService consumedLedgersMonitor;
     private ScheduledExecutorService topicPublishRateLimiterMonitor;
     private ScheduledExecutorService brokerPublishRateLimiterMonitor;
     protected volatile PublishRateLimiter brokerPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
@@ -270,6 +272,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-compaction-monitor"));
         this.messagePublishBufferMonitor =
             Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-publish-buffer-monitor"));
+        this.consumedLedgersMonitor = Executors
+                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("consumed-Ledgers-monitor"));
 
         this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
         this.backlogQuotaChecker = Executors
@@ -400,6 +404,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         this.startMessageExpiryMonitor();
         this.startCompactionMonitor();
         this.startMessagePublishBufferMonitor();
+        this.startConsumedLedgersMonitor();
         this.startBacklogQuotaChecker();
         this.updateBrokerPublisherThrottlingMaxRate();
         // register listener to capture zk-latency
@@ -460,6 +465,14 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         }
     }
 
+    protected void startConsumedLedgersMonitor() {
+        int interval = pulsar().getConfiguration().getRetentionCheckIntervalInSeconds();
+        if (interval > 0) {
+            consumedLedgersMonitor.scheduleAtFixedRate(safeRun(this::checkConsumedLedgers),
+                                                            interval, interval, TimeUnit.SECONDS);
+        }
+    }
+
     protected void startBacklogQuotaChecker() {
         if (pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) {
             final int interval = pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds();
@@ -1150,6 +1163,18 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             });
     }
 
+    private void checkConsumedLedgers() {
+        forEachTopic((t) -> {
+            if (t instanceof PersistentTopic) {
+                Optional.ofNullable(((PersistentTopic) t).getManagedLedger()).ifPresent(
+                        managedLedger -> {
+                            managedLedger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+                        }
+                );
+            }
+        });
+    }
+
     public void checkMessageDeduplicationInfo() {
         forEachTopic(Topic::checkMessageDeduplicationInfo);
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
new file mode 100644
index 0000000..72c2eff
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+
+import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.junit.Test;
+import org.testng.Assert;
+
+import java.util.concurrent.TimeUnit;
+
+public class ConsumedLedgersTrimTest extends BrokerTestBase {
+    @Override
+    protected void setup() throws Exception {
+        //No-op
+    }
+
+    @Override
+    protected void cleanup() throws Exception {
+        //No-op
+    }
+
+    @Test
+    public void TestConsumedLedgersTrim() throws Exception {
+        conf.setRetentionCheckIntervalInSeconds(1);
+        super.baseSetup();
+        final String topicName = "persistent://prop/ns-abc/TestConsumedLedgersTrim";
+        final String subscriptionName = "my-subscriber-name";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .producerName("producer-name")
+                .create();
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+                .subscribe();
+        Topic topicRef = pulsar.getBrokerService().getTopicReference(topicName).get();
+        Assert.assertNotNull(topicRef);
+        PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
+        ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
+        managedLedgerConfig.setRetentionSizeInMB(1L);
+        managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS);
+        managedLedgerConfig.setMaxEntriesPerLedger(2);
+        managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
+
+        int msgNum = 10;
+        for (int i = 0; i < msgNum; i++) {
+            producer.send(new byte[1024 * 1024]);
+        }
+
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+        Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), msgNum / 2);
+
+        //no traffic, unconsumed ledger will be retained
+        Thread.sleep(1200);
+        Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), msgNum / 2);
+
+        for (int i = 0; i < msgNum; i++) {
+            Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
+            Assert.assertTrue(msg != null);
+            consumer.acknowledge(msg);
+        }
+        Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), msgNum / 2);
+
+        //no traffic, but consumed ledger will be cleaned
+        Thread.sleep(1500);
+        Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
+    }
+}