You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/04 05:05:22 UTC

[pulsar] branch master updated: Add control of single topic gc in inactive state. (#7093)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 88babcc  Add control of single topic gc in inactive state. (#7093)
88babcc is described below

commit 88babcc8331508b5c8becc6e216d7c293b4cb86e
Author: Hao Zhang <zh...@cmss.chinamobile.com>
AuthorDate: Thu Jun 4 13:05:06 2020 +0800

    Add control of single topic gc in inactive state. (#7093)
    
    Motivation
    The brokerDeleteInactiveTopicsEnabled parameter is effect to all Topics, but in some cases like topic created by protocol handler, we need keep some topics not be deleted even if they're in inactive state.
    
    Modifications
    Add deleteWhileInactive parameter in AbstractTopic, and when create a new topic, we can call topic.setDeleteWhileInactive(false) to keep the topic not be deleted by GC.
---
 .../pulsar/broker/service/AbstractTopic.java       | 13 ++++++++++++
 .../service/nonpersistent/NonPersistentTopic.java  |  4 ++++
 .../broker/service/persistent/PersistentTopic.java | 23 ++++++++++------------
 3 files changed, 27 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 54c9796..4a02f1f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -63,6 +63,9 @@ public abstract class AbstractTopic implements Topic {
 
     protected volatile boolean isFenced;
 
+    // When set to false, this inactive topic can not be deleted
+    protected boolean deleteWhileInactive;
+
     // Timestamp of when this topic was last seen active
     protected volatile long lastActive;
 
@@ -95,6 +98,8 @@ public abstract class AbstractTopic implements Topic {
         this.producers = new ConcurrentHashMap<>();
         this.isFenced = false;
         this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
+        this.deleteWhileInactive =
+                brokerService.pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled();
         this.lastActive = System.nanoTime();
         Policies policies = null;
         try {
@@ -460,5 +465,13 @@ public abstract class AbstractTopic implements Topic {
         return getStats(false).bytesOutCounter;
     }
 
+    public boolean isDeleteWhileInactive() {
+        return deleteWhileInactive;
+    }
+
+    public void setDeleteWhileInactive(boolean deleteWhileInactive) {
+        this.deleteWhileInactive = deleteWhileInactive;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index ca26a19..eb8d9d3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -830,6 +830,10 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
 
     @Override
     public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) {
+        if (!deleteWhileInactive) {
+            // This topic is not included in GC
+            return;
+        }
         if (isActive()) {
             lastActive = System.nanoTime();
         } else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3b6ea4a..f5cef0e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -18,16 +18,17 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import com.carrotsearch.hppc.ObjectObjectHashMap;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.FastThreadLocal;
-
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -38,8 +39,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-
+import java.util.function.BiFunction;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -125,15 +127,6 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiFunction;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static org.apache.commons.lang3.StringUtils.isBlank;
-import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-
 public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback {
 
     // Managed ledger associated with the topic
@@ -1647,6 +1640,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
     @Override
     public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) {
+        if (!deleteWhileInactive) {
+            // This topic is not included in GC
+            return;
+        }
         if (isActive(deleteMode)) {
             lastActive = System.nanoTime();
         } else if (System.nanoTime() - lastActive < TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec)) {