You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/01/31 18:22:43 UTC

[GitHub] merlimat closed pull request #1135: Added infinite time retention configuration option

merlimat closed pull request #1135: Added infinite time retention configuration option
URL: https://github.com/apache/incubator-pulsar/pull/1135
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 6f9847bb1..fd81ba188 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -319,6 +319,16 @@ public ManagedLedgerConfig setThrottleMarkDelete(double throttleMarkDelete) {
     }
 
     /**
+     * Set the retention time for the ManagedLedger
+     * <p>
+     * Retention time will prevent data from being deleted for at least the specified amount of time, even if no cursors
+     * are created, or if all the cursors have marked the data for deletion.
+     * <p>
+     * A retention time of 0 (the default), will to have no time based retention.
+     * <p>
+     * Specifying a negative retention time will make the data to be retained indefinitely, based on the
+     * {@link #setRetentionSizeInMB(long)} value.
+     *
      * @param retentionTime
      *            duration for which messages should be retained
      * @param unit
@@ -338,6 +348,15 @@ public long getRetentionTimeMillis() {
     }
 
     /**
+     * The retention size is used to set a maximum retention size quota on the ManagedLedger.
+     * <p>
+     * This setting works in conjuction with {@link #setRetentionSizeInMB(long)} and places a max size for retention,
+     * after which the data is deleted.
+     * <p>
+     * A retention size of 0, will make data to be deleted immediately.
+     * <p>
+     * A retention size of -1, means to have an unlimited retention size.
+     *
      * @param retentionSizeInMB
      *            quota for message retention
      */
@@ -357,7 +376,7 @@ public long getRetentionSizeInMB() {
     /**
      * Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
      * corrupted at bookkeeper and managed-cursor is stuck at that ledger.
-     * 
+     *
      * @param autoSkipNonRecoverableData
      */
     public boolean isAutoSkipNonRecoverableData() {
@@ -384,10 +403,10 @@ public ManagedLedgerConfig setMaxUnackedRangesToPersist(int maxUnackedRangesToPe
         this.maxUnackedRangesToPersist = maxUnackedRangesToPersist;
         return this;
     }
-    
+
     /**
      * @return max unacked message ranges up to which it can store in Zookeeper
-     * 
+     *
      */
     public int getMaxUnackedRangesToPersistInZk() {
         return maxUnackedRangesToPersistInZk;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 767d3dc7e..9bbea72a4 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -161,7 +161,7 @@
     final static long WaitTimeAfterLedgerCreationFailureMs = 10000;
 
     volatile PositionImpl lastConfirmedEntry;
-    
+
     protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
     protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;
 
@@ -1485,10 +1485,21 @@ private void scheduleDeferredTrimming() {
     }
 
     private boolean hasLedgerRetentionExpired(long ledgerTimestamp) {
+        if (config.getRetentionTimeMillis() < 0) {
+            // Negative retention time equates to infinite retention
+            return false;
+        }
+
         long elapsedMs = System.currentTimeMillis() - ledgerTimestamp;
         return elapsedMs > config.getRetentionTimeMillis();
     }
 
+    private boolean isLedgerRetentionOverSizeQuota() {
+        // Handle the -1 size limit as "infinite" size quota
+        return config.getRetentionSizeInMB() > 0
+                && TOTAL_SIZE_UPDATER.get(this) > ((long) config.getRetentionSizeInMB()) * 1024 * 1024;
+    }
+
     /**
      * Checks whether there are ledger that have been fully consumed and deletes them
      *
@@ -1537,7 +1548,7 @@ void internalTrimConsumedLedgers() {
             // skip ledger if retention constraint met
             for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, false).values()) {
                 boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
-                boolean overRetentionQuota = TOTAL_SIZE_UPDATER.get(this) > ((long) config.getRetentionSizeInMB()) * 1024 * 1024;
+                boolean overRetentionQuota = isLedgerRetentionOverSizeQuota();
 
                 if (log.isDebugEnabled()) {
                     log.debug(
@@ -1714,7 +1725,7 @@ private void asyncDeleteLedger(long ledgerId, long retry) {
             }
         }, null);
     }
-    
+
     private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) {
         List<LedgerInfo> ledgers = Lists.newArrayList(ManagedLedgerImpl.this.ledgers.values());
         AtomicInteger ledgersToDelete = new AtomicInteger(ledgers.size());
@@ -2199,7 +2210,7 @@ public static ManagedLedgerException createManagedLedgerException(int bkErrorCod
             return new ManagedLedgerException(BKException.getMessage(bkErrorCode));
         }
     }
-    
+
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 821061afe..2dcf8cd0f 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -1668,11 +1668,35 @@ public void testDeletionAfterRetention() throws Exception {
         c1.skipEntries(1, IndividualDeletedEntries.Exclude);
         // let retention expire
         Thread.sleep(1000);
-        ml.close();
-        // sleep for trim
-        Thread.sleep(100);
+        ml.internalTrimConsumedLedgers();
+
         assertTrue(ml.getLedgersInfoAsList().size() <= 1);
         assertTrue(ml.getTotalSize() <= "shortmessage".getBytes().length);
+        ml.close();
+    }
+
+    @Test
+    public void testInfiniteRetention() throws Exception {
+        ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setRetentionSizeInMB(-1);
+        config.setRetentionTime(-1, TimeUnit.HOURS);
+        config.setMaxEntriesPerLedger(1);
+
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("retention_test_ledger", config);
+        ManagedCursor c1 = ml.openCursor("c1");
+        ml.addEntry("iamaverylongmessagethatshouldberetained".getBytes());
+        c1.skipEntries(1, IndividualDeletedEntries.Exclude);
+        ml.close();
+
+        // reopen ml
+        ml = (ManagedLedgerImpl) factory.open("retention_test_ledger", config);
+        c1 = ml.openCursor("c1");
+        ml.addEntry("shortmessage".getBytes());
+        c1.skipEntries(1, IndividualDeletedEntries.Exclude);
+        ml.close();
+        assertTrue(ml.getLedgersInfoAsList().size() > 1);
+        assertTrue(ml.getTotalSize() > "shortmessage".getBytes().length);
     }
 
     @Test
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 7255bef57..d38fa3b92 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -22,10 +22,11 @@
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
-import java.time.Instant;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.util.*;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -86,7 +87,6 @@
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublisherStats;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
-import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.DateFormatter;
@@ -1323,9 +1323,13 @@ private boolean shouldTopicBeRetained() {
             Optional<Policies> policies = brokerService.pulsar().getConfigurationCache().policiesCache()
                     .get(AdminResource.path(POLICIES, name.getNamespace()));
             // If no policies, the default is to have no retention and delete the inactive topic
-            return policies.map(p -> p.retention_policies)
-                    .map(rp -> System.nanoTime() - lastActive < TimeUnit.MINUTES.toNanos(rp.getRetentionTimeInMinutes()))
-                    .orElse(false).booleanValue();
+            return policies.map(p -> p.retention_policies).map(rp -> {
+                long retentionTime = TimeUnit.MINUTES.toNanos(rp.getRetentionTimeInMinutes());
+
+                // Negative retention time means the topic should be retained indefinitely,
+                // because its own data has to be retained
+                return retentionTime < 0 || (System.nanoTime() - lastActive) < retentionTime;
+            }).orElse(false).booleanValue();
         } catch (Exception e) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Error getting policies", topic);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 9086203c3..93ce4b5c4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -642,6 +642,53 @@ public void testGcAndRetentionPolicy() throws Exception {
         assertNull(pulsar.getBrokerService().getTopicReference(topicName));
     }
 
+    /**
+     * A topic that has retention policy set to -1, should not be GCed
+     * until it has been inactive for at least the retention time and the data
+     * should never be deleted
+     */
+    @Test
+    public void testInfiniteRetentionPolicy() throws Exception {
+        // Retain data forever
+        admin.namespaces().setRetention("prop/use/ns-abc", new RetentionPolicies(-1, -1));
+
+        // 1. Simple successful GC
+        String topicName = "persistent://prop/use/ns-abc/topic-10";
+        Producer producer = pulsarClient.createProducer(topicName);
+        producer.close();
+
+        assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
+        runGC();
+        // Should not have been deleted, since we have retention
+        assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
+
+
+        // Remove retention
+        admin.namespaces().setRetention("prop/use/ns-abc", new RetentionPolicies(0, 10));
+        Thread.sleep(300);
+
+        // 2. Topic is not GCed with live connection
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        String subName = "sub1";
+        Consumer consumer = pulsarClient.subscribe(topicName, subName, conf);
+
+        runGC();
+        assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
+
+        // 3. Topic with subscription is not GCed even with no connections
+        consumer.close();
+
+        runGC();
+        assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
+
+        // 4. Topic can be GCed after unsubscribe
+        admin.persistentTopics().deleteSubscription(topicName, subName);
+
+        runGC();
+        assertNull(pulsar.getBrokerService().getTopicReference(topicName));
+    }
+
     @Test
     public void testMessageExpiry() throws Exception {
         int messageTTLSecs = 1;
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index e04eef7be..6e84bf629 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -232,7 +232,7 @@ void run() throws PulsarAdminException {
     private class GetAntiAffinityGroup extends CliCommand {
         @Parameter(description = "property/cluster/namespace\n", required = true)
         private java.util.List<String> params;
-        
+
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
@@ -263,14 +263,14 @@ void run() throws PulsarAdminException {
     private class DeleteAntiAffinityGroup extends CliCommand {
         @Parameter(description = "property/cluster/namespace\n", required = true)
         private java.util.List<String> params;
-        
+
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
             admin.namespaces().deleteNamespaceAntiAffinityGroup(namespace);
         }
     }
-    
+
 
     @Parameters(commandDescription = "Enable or disable deduplication for a namespace")
     private class SetDeduplication extends CliCommand {
@@ -300,10 +300,12 @@ void run() throws PulsarAdminException {
         private java.util.List<String> params;
 
         @Parameter(names = { "--time",
-                "-t" }, description = "Retention time in minutes (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w)", required = true)
+                "-t" }, description = "Retention time in minutes (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w). "
+                        + "0 means no retention and -1 means infinite time retention", required = true)
         private String retentionTimeStr;
 
-        @Parameter(names = { "--size", "-s" }, description = "Retention size limit (eg: 10M, 16G)", required = true)
+        @Parameter(names = { "--size", "-s" }, description = "Retention size limit (eg: 10M, 16G, 3T). "
+                + "0 means no retention and -1 means infinite size retention", required = true)
         private String limitStr;
 
         @Override
@@ -625,6 +627,10 @@ private static long validateSizeString(String s) {
         case 'G':
             return Long.parseLong(subStr) * 1024 * 1024 * 1024;
 
+        case 't':
+        case 'T':
+            return Long.parseLong(subStr) * 1024 * 1024 * 1024 * 1024;
+
         default:
             return Long.parseLong(s);
         }
@@ -680,7 +686,7 @@ public CmdNamespaces(PulsarAdmin admin) {
 
         jcommander.addCommand("get-message-ttl", new GetMessageTTL());
         jcommander.addCommand("set-message-ttl", new SetMessageTTL());
-        
+
         jcommander.addCommand("get-anti-affinity-group", new GetAntiAffinityGroup());
         jcommander.addCommand("set-anti-affinity-group", new SetAntiAffinityGroup());
         jcommander.addCommand("get-anti-affinity-namespaces", new GetAntiAffinityNamespaces());
diff --git a/site/_data/cli/pulsar-admin.yaml b/site/_data/cli/pulsar-admin.yaml
index 7ac972f93..eb7c8059b 100644
--- a/site/_data/cli/pulsar-admin.yaml
+++ b/site/_data/cli/pulsar-admin.yaml
@@ -213,9 +213,9 @@ commands:
     argument: property/cluster/namespace
     options:
     - flags: -s, --size
-      description: The retention size limits (for example `10M` or `16G`)
+      description: The retention size limits (for example `10M`, `16G` or `3T`). 0 means no retention and -1 means infinite size retention
     - flags: -t, --time
-      description: "The retention time in minutes, hours, days, or weeks. Examples: `100m`, `13h`, `2d`, `5w`."
+      description: "The retention time in minutes, hours, days, or weeks. Examples: `100m`, `13h`, `2d`, `5w`. 0 means no retention and -1 means infinite time retention"
   - name: unload
     description: Unload a namespace or namespace bundle from the current serving broker.
     argument: property/cluster/namespace
@@ -295,14 +295,14 @@ commands:
     description: Look up a topic from the current serving broker
     argument: persistent://property/cluster/namespace/topic
   - name: bundle-range
-    description: Get the namespace bundle which contains the given topic  
+    description: Get the namespace bundle which contains the given topic
     argument: persistent://property/cluster/namespace/topic
   - name: delete
     description: Delete a topic. The topic cannot be deleted if there are any active subscriptions or producers connected to the topic.
     argument: persistent://property/cluster/namespace/topic
   - name: unload
     description: Unload a topic
-    argument: persistent://property/cluster/namespace/topic    
+    argument: persistent://property/cluster/namespace/topic
   - name: subscriptions
     description: Get the list of subscriptions on the topic
     argument: persistent://property/cluster/namespace/topic


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services