You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/04/08 02:30:08 UTC
[pulsar] branch master updated: Retention policy should be
respected when there is no traffic (#6676)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a658791 Retention policy should be respected when there is no traffic (#6676)
a658791 is described below
commit a658791b4dc696b8ada1fde1a7aa239b251dfda6
Author: feynmanlin <31...@qq.com>
AuthorDate: Wed Apr 8 10:29:53 2020 +0800
Retention policy should be respected when there is no traffic (#6676)
Add a new feature:
Retention policy should be respected when there is no traffic #6655
* Retention policy should be respected when there is no traffic(#6655)
* change parameter name and add annotations
* Reduce the scheduling interval
Co-authored-by: feynmanlin <fe...@tencent.com>
Co-authored-by: Sijie Guo <si...@apache.org>
---
conf/broker.conf | 4 +
.../apache/bookkeeper/mledger/ManagedLedger.java | 7 ++
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 +-
.../apache/pulsar/broker/ServiceConfiguration.java | 6 ++
.../pulsar/broker/service/BrokerService.java | 25 ++++++
.../broker/service/ConsumedLedgersTrimTest.java | 93 ++++++++++++++++++++++
6 files changed, 137 insertions(+), 1 deletion(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index e02e323..eb4d22d 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -346,6 +346,10 @@ maxMessagePublishBufferSizeInMB=
# Use 0 or negative number to disable the max publish buffer limiting.
messagePublishBufferCheckIntervalInMillis=100
+# Check between intervals to see if consumed ledgers need to be trimmed
+# Use 0 or negative number to disable the check
+retentionCheckIntervalInSeconds=120
+
### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 548316c..e004c82 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.Beta;
import io.netty.buffer.ByteBuf;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -447,4 +448,10 @@ public interface ManagedLedger {
* Signaling managed ledger that we can resume after BK write failure
*/
void readyToCreateNewLedger();
+
+ /**
+ * Trim consumed ledgers in background
+ * @param promise
+ */
+ void trimConsumedLedgersInBackground(CompletableFuture<?> promise);
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7a38da1..0bb788b 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1834,7 +1834,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
}
- private void trimConsumedLedgersInBackground(CompletableFuture<?> promise) {
+ @Override
+ public void trimConsumedLedgersInBackground(CompletableFuture<?> promise) {
executor.executeOrdered(name, safeRun(() -> internalTrimConsumedLedgers(promise)));
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 3cc068f..f649910 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -634,6 +634,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int messagePublishBufferCheckIntervalInMillis = 100;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Check between intervals to see if consumed ledgers need to be trimmed"
+ )
+ private int retentionCheckIntervalInSeconds = 120;
+
/**** --- Messaging Protocols --- ****/
@FieldContext(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index a28f3a4..cb8f8c0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -78,6 +78,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
@@ -191,6 +192,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private final ScheduledExecutorService messageExpiryMonitor;
private final ScheduledExecutorService compactionMonitor;
private final ScheduledExecutorService messagePublishBufferMonitor;
+ private final ScheduledExecutorService consumedLedgersMonitor;
private ScheduledExecutorService topicPublishRateLimiterMonitor;
private ScheduledExecutorService brokerPublishRateLimiterMonitor;
protected volatile PublishRateLimiter brokerPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
@@ -272,6 +274,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-compaction-monitor"));
this.messagePublishBufferMonitor =
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-publish-buffer-monitor"));
+ this.consumedLedgersMonitor = Executors
+ .newSingleThreadScheduledExecutor(new DefaultThreadFactory("consumed-Ledgers-monitor"));
this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
this.backlogQuotaChecker = Executors
@@ -402,6 +406,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
this.startMessageExpiryMonitor();
this.startCompactionMonitor();
this.startMessagePublishBufferMonitor();
+ this.startConsumedLedgersMonitor();
this.startBacklogQuotaChecker();
this.updateBrokerPublisherThrottlingMaxRate();
this.startCheckReplicationPolicies();
@@ -471,6 +476,14 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
}
+ protected void startConsumedLedgersMonitor() {
+ int interval = pulsar().getConfiguration().getRetentionCheckIntervalInSeconds();
+ if (interval > 0) {
+ consumedLedgersMonitor.scheduleAtFixedRate(safeRun(this::checkConsumedLedgers),
+ interval, interval, TimeUnit.SECONDS);
+ }
+ }
+
protected void startBacklogQuotaChecker() {
if (pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) {
final int interval = pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds();
@@ -1164,6 +1177,18 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
});
}
+ private void checkConsumedLedgers() {
+ forEachTopic((t) -> {
+ if (t instanceof PersistentTopic) {
+ Optional.ofNullable(((PersistentTopic) t).getManagedLedger()).ifPresent(
+ managedLedger -> {
+ managedLedger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+ }
+ );
+ }
+ });
+ }
+
public void checkMessageDeduplicationInfo() {
forEachTopic(Topic::checkMessageDeduplicationInfo);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
new file mode 100644
index 0000000..72c2eff
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+
+import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.junit.Test;
+import org.testng.Assert;
+
+import java.util.concurrent.TimeUnit;
+
+public class ConsumedLedgersTrimTest extends BrokerTestBase {
+ @Override
+ protected void setup() throws Exception {
+ //No-op
+ }
+
+ @Override
+ protected void cleanup() throws Exception {
+ //No-op
+ }
+
+ @Test
+ public void TestConsumedLedgersTrim() throws Exception {
+ conf.setRetentionCheckIntervalInSeconds(1);
+ super.baseSetup();
+ final String topicName = "persistent://prop/ns-abc/TestConsumedLedgersTrim";
+ final String subscriptionName = "my-subscriber-name";
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .producerName("producer-name")
+ .create();
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+ .subscribe();
+ Topic topicRef = pulsar.getBrokerService().getTopicReference(topicName).get();
+ Assert.assertNotNull(topicRef);
+ PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
+ ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
+ managedLedgerConfig.setRetentionSizeInMB(1L);
+ managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS);
+ managedLedgerConfig.setMaxEntriesPerLedger(2);
+ managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
+
+ int msgNum = 10;
+ for (int i = 0; i < msgNum; i++) {
+ producer.send(new byte[1024 * 1024]);
+ }
+
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+ Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), msgNum / 2);
+
+ //no traffic, unconsumed ledger will be retained
+ Thread.sleep(1200);
+ Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), msgNum / 2);
+
+ for (int i = 0; i < msgNum; i++) {
+ Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
+ Assert.assertTrue(msg != null);
+ consumer.acknowledge(msg);
+ }
+ Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), msgNum / 2);
+
+ //no traffic, but consumed ledger will be cleaned
+ Thread.sleep(1500);
+ Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
+ }
+}