You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/22 13:33:24 UTC

[GitHub] [pulsar] jangwind opened a new pull request #10326: support truncate topic

jangwind opened a new pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326


   Fixes #9597 
   
   ### Motivation
   
   Add support for truncate all data of the topic without disconnect the producers and consumers.
   
   ### Modifications
   
   Add a API to truncate data of the topic which can delete all inactive ledgers even if the data retention is enabled. The truncate operation will move all cursors to the end of the topic and delete all inactive ledgers. The truncate operation will not rollover the current ledger, so the active ledger will not been deleted. So for this cases, if users want to delete all data from the topic and currently no new data writes to the topic, all the data will be deleted after the ledger rollover triggered.
   
   And, in the future we can add a separate command for rollover the ledger manually. Which can decouple with the truncate api. So that from the admin side, we can combine the rollover API and the truncate API to delete both the current active ledger and inactive ledgers.
   
   ### Verifying this change
   
   
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (yes)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (yes)
     - The admin cli options: (yes)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jangwind commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
jangwind commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r626633227



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -3738,6 +3751,35 @@ public void setEntriesAddedCounter(long count) {
 
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
+    @Override
+    public CompletableFuture<Void> asyncTruncate() {
+
+        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+        for (ManagedCursor cursor : cursors) {
+            final CompletableFuture<Void> future = new CompletableFuture<>();
+            cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback() {

Review comment:
       Thank you very much for your comments. I think ‘truncate' is more like deleting all the messages than deleting the consumed messages. And I want to add a 'trim' function to the next PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r620100488



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -3738,6 +3751,35 @@ public void setEntriesAddedCounter(long count) {
 
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
+    @Override
+    public CompletableFuture<Void> asyncTruncate() {
+
+        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+        for (ManagedCursor cursor : cursors) {
+            final CompletableFuture<Void> future = new CompletableFuture<>();
+            cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback() {

Review comment:
       Clear backlog will lose unacknowledged messages?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r619753621



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
##########
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.mledger;
 
+import java.util.concurrent.CompletableFuture;

Review comment:
       Is this import necessary?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -3922,4 +3922,89 @@ protected void internalHandleResult(AsyncResponse asyncResponse,
             }
         }
     }
+
+    protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean authoritative) {
+
+        // If the topic name is a partition name, no need to get partition topic metadata again
+        if (topicName.isPartitioned()) {
+            Topic topic;
+            try {
+                validateAdminAccessForTenant(topicName.getTenant());
+                validateTopicOwnership(topicName, authoritative);
+                topic = getTopicReference(topicName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
+            CompletableFuture<Void> future = topic.truncate();
+            future.thenAccept(a -> {
+                asyncResponse.resume(new RestException(Response.Status.NO_CONTENT.getStatusCode(),
+                        Response.Status.NO_CONTENT.getReasonPhrase()));
+            }).exceptionally(e -> {
+                asyncResponse.resume(e);
+                return null;
+            });
+        } else {
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false).whenComplete((meta, t) -> {
+                if (meta.partitions > 0) {
+                    final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+                    for (int i = 0; i < meta.partitions; i++) {
+                        TopicName topicNamePartition = topicName.getPartition(i);
+                        try {
+                            futures.add(pulsar().getAdminClient().topics()
+                                .truncateAsync(topicNamePartition.toString()));
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to truncate topic {}", clientAppId(), topicNamePartition, e);
+                            asyncResponse.resume(new RestException(e));
+                            return;
+                        }
+                    }
+                    FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                        if (exception != null) {
+                            Throwable th = exception.getCause();
+                            if (th instanceof NotFoundException) {
+                                asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage()));
+                            } else if (th instanceof WebApplicationException) {
+                                asyncResponse.resume(th);
+                            } else {
+                                log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, exception);
+                                asyncResponse.resume(new RestException(exception));
+                            }
+                        } else {
+                            asyncResponse.resume(Response.noContent().build());
+                        }
+                        return null;
+                    });
+                } else {
+                    Topic topic;
+                    try {
+                        validateAdminAccessForTenant(topicName.getTenant());
+                        validateTopicOwnership(topicName, authoritative);
+                        topic = getTopicReference(topicName);
+                    } catch (Exception e) {
+                        log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, e);
+                        resumeAsyncResponseExceptionally(asyncResponse, e);
+                        return;
+                    }
+                    CompletableFuture<Void> future = topic.truncate();
+                    future.thenAccept(a -> {
+                        asyncResponse.resume(new RestException(Response.Status.NO_CONTENT.getStatusCode(),
+                                Response.Status.NO_CONTENT.getReasonPhrase()));
+                    }).exceptionally(e -> {
+                        asyncResponse.resume(e);
+                        return null;
+                    });

Review comment:
       It seems that this block repeated with above.

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
##########
@@ -600,4 +600,10 @@ void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.Upd
      * will got null if corresponding ledger not exists.
      */
     CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId);
+
+    /**
+     * Truncate ledgers
+     * The latest ledger cannot be deleted ,and only delete acknowledged ledgers

Review comment:
       Only delete ledgers before the mark delete position, right?

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -3738,6 +3751,35 @@ public void setEntriesAddedCounter(long count) {
 
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
+    @Override
+    public CompletableFuture<Void> asyncTruncate() {
+
+        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+        for (ManagedCursor cursor : cursors) {
+            final CompletableFuture<Void> future = new CompletableFuture<>();
+            cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback() {

Review comment:
       Why need to clear backlog?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -3922,4 +3922,89 @@ protected void internalHandleResult(AsyncResponse asyncResponse,
             }
         }
     }
+
+    protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean authoritative) {
+
+        // If the topic name is a partition name, no need to get partition topic metadata again
+        if (topicName.isPartitioned()) {
+            Topic topic;
+            try {
+                validateAdminAccessForTenant(topicName.getTenant());
+                validateTopicOwnership(topicName, authoritative);
+                topic = getTopicReference(topicName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
+            CompletableFuture<Void> future = topic.truncate();
+            future.thenAccept(a -> {
+                asyncResponse.resume(new RestException(Response.Status.NO_CONTENT.getStatusCode(),
+                        Response.Status.NO_CONTENT.getReasonPhrase()));
+            }).exceptionally(e -> {
+                asyncResponse.resume(e);
+                return null;
+            });

Review comment:
       It seems that this block is repeated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] tuteng commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
tuteng commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r619931462



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2316,10 +2329,10 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) {
                     log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name,
                             ls.getLedgerId());
                     break;
-                } else if (expired) {
+                } else if (expired || isTruncate) {
                     log.debug("[{}] Ledger {} has expired, ts {}", name, ls.getLedgerId(), ls.getTimestamp());
                     ledgersToDelete.add(ls);
-                } else if (overRetentionQuota) {
+                } else if (overRetentionQuota || isTruncate) {
                     log.debug("[{}] Ledger {} is over quota", name, ls.getLedgerId());

Review comment:
       When overRetentionQuota is `false` and isTruncate is `true`, it seems that the debug info needs to be updated.
   
   

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2316,10 +2329,10 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) {
                     log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name,
                             ls.getLedgerId());
                     break;
-                } else if (expired) {
+                } else if (expired || isTruncate) {
                     log.debug("[{}] Ledger {} has expired, ts {}", name, ls.getLedgerId(), ls.getTimestamp());

Review comment:
       When expired is `false` and `isTruncate` is true, it seems that the debug info needs to be updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jangwind commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
jangwind commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r619168739



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -3922,4 +3922,66 @@ protected void internalHandleResult(AsyncResponse asyncResponse,
             }
         }
     }
+
+    protected void internalTruncateTopic(AsyncResponse asyncResponse) {
+

Review comment:
       I see. Thank you very much. I will modify them in the way of check the topic ownership first and then get the topic reference. At the same time, The implementation is moved from brokerService to Topic




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r629472720



##########
File path: managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
##########
@@ -2917,4 +2916,59 @@ public void testManagedLedgerRollOverIfFull() throws Exception {
         Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1);
         Assert.assertEquals(ledger.getTotalSize(), 0);
     }
+
+    @Test(timeOut = 20000)
+    public void testAsyncTruncateLedgerRetention() throws Exception {
+
+        ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setRetentionSizeInMB(50);
+        config.setRetentionTime(1, TimeUnit.DAYS);
+
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("truncate_ledger", config);
+        ManagedCursor cursor = ledger.openCursor("test-cursor");
+        ledger.addEntry("test-entry-1".getBytes(Encoding));
+        ledger.addEntry("test-entry-1".getBytes(Encoding));
+        ledger.addEntry("test-entry-1".getBytes(Encoding));
+        ledger.addEntry("test-entry-1".getBytes(Encoding));
+        ledger.addEntry("test-entry-1".getBytes(Encoding));
+
+        ledger.close();
+        ManagedLedgerImpl ledger2 = (ManagedLedgerImpl)factory.open("truncate_ledger", config);
+        ledger2.addEntry("test-entry-2".getBytes(Encoding));
+
+
+        CompletableFuture<Void> future = ledger2.asyncTruncate();
+        future.get();
+
+        assertTrue(ledger2.getLedgersInfoAsList().size() <= 1);

Review comment:
       can we use setMaxEntriesPerLedger to make managedLedger create new ledger so even current ledger won't be deleted, we can see older ledgers get removed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jangwind commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
jangwind commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r619406570



##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
##########
@@ -286,6 +287,19 @@ void run() throws PulsarAdminException {
         }
     }
 
+    @Parameters(commandDescription = "Truncate a topic. \n"
+            + "\t\tThe topic will be truncate, but the latest ledger cannot be deleted.")

Review comment:
       Thank you very much, I will fix it as soon as possible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jangwind commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
jangwind commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r619809608



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
##########
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.mledger;
 
+import java.util.concurrent.CompletableFuture;

Review comment:
       Sorry, I added this dependency in the previous modification, I will delete this dependency.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -3922,4 +3922,89 @@ protected void internalHandleResult(AsyncResponse asyncResponse,
             }
         }
     }
+
+    protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean authoritative) {
+
+        // If the topic name is a partition name, no need to get partition topic metadata again
+        if (topicName.isPartitioned()) {
+            Topic topic;
+            try {
+                validateAdminAccessForTenant(topicName.getTenant());
+                validateTopicOwnership(topicName, authoritative);
+                topic = getTopicReference(topicName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
+            CompletableFuture<Void> future = topic.truncate();
+            future.thenAccept(a -> {
+                asyncResponse.resume(new RestException(Response.Status.NO_CONTENT.getStatusCode(),
+                        Response.Status.NO_CONTENT.getReasonPhrase()));
+            }).exceptionally(e -> {
+                asyncResponse.resume(e);
+                return null;
+            });

Review comment:
       Thank you for your comment. This block is based on (topicName.isPartitioned()==true) and another is based on (meta.partitions == 0). I will merge them Into a function.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -3922,4 +3922,89 @@ protected void internalHandleResult(AsyncResponse asyncResponse,
             }
         }
     }
+
+    protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean authoritative) {
+
+        // If the topic name is a partition name, no need to get partition topic metadata again
+        if (topicName.isPartitioned()) {
+            Topic topic;
+            try {
+                validateAdminAccessForTenant(topicName.getTenant());
+                validateTopicOwnership(topicName, authoritative);
+                topic = getTopicReference(topicName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
+            CompletableFuture<Void> future = topic.truncate();
+            future.thenAccept(a -> {
+                asyncResponse.resume(new RestException(Response.Status.NO_CONTENT.getStatusCode(),
+                        Response.Status.NO_CONTENT.getReasonPhrase()));
+            }).exceptionally(e -> {
+                asyncResponse.resume(e);
+                return null;
+            });
+        } else {
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false).whenComplete((meta, t) -> {
+                if (meta.partitions > 0) {
+                    final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+                    for (int i = 0; i < meta.partitions; i++) {
+                        TopicName topicNamePartition = topicName.getPartition(i);
+                        try {
+                            futures.add(pulsar().getAdminClient().topics()
+                                .truncateAsync(topicNamePartition.toString()));
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to truncate topic {}", clientAppId(), topicNamePartition, e);
+                            asyncResponse.resume(new RestException(e));
+                            return;
+                        }
+                    }
+                    FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                        if (exception != null) {
+                            Throwable th = exception.getCause();
+                            if (th instanceof NotFoundException) {
+                                asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage()));
+                            } else if (th instanceof WebApplicationException) {
+                                asyncResponse.resume(th);
+                            } else {
+                                log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, exception);
+                                asyncResponse.resume(new RestException(exception));
+                            }
+                        } else {
+                            asyncResponse.resume(Response.noContent().build());
+                        }
+                        return null;
+                    });
+                } else {
+                    Topic topic;
+                    try {
+                        validateAdminAccessForTenant(topicName.getTenant());
+                        validateTopicOwnership(topicName, authoritative);
+                        topic = getTopicReference(topicName);
+                    } catch (Exception e) {
+                        log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, e);
+                        resumeAsyncResponseExceptionally(asyncResponse, e);
+                        return;
+                    }
+                    CompletableFuture<Void> future = topic.truncate();
+                    future.thenAccept(a -> {
+                        asyncResponse.resume(new RestException(Response.Status.NO_CONTENT.getStatusCode(),
+                                Response.Status.NO_CONTENT.getReasonPhrase()));
+                    }).exceptionally(e -> {
+                        asyncResponse.resume(e);
+                        return null;
+                    });

Review comment:
       Thank you for your comment. This block is based on (topicName.isPartitioned()==true) and another is based on (meta.partitions == 0). I will merge them Into a function.

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -3738,6 +3751,35 @@ public void setEntriesAddedCounter(long count) {
 
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
+    @Override
+    public CompletableFuture<Void> asyncTruncate() {
+
+        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+        for (ManagedCursor cursor : cursors) {
+            final CompletableFuture<Void> future = new CompletableFuture<>();
+            cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback() {

Review comment:
       If it doesn't clearly backLog, it will be trim instead of truncate. So I do it.

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
##########
@@ -600,4 +600,10 @@ void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.Upd
      * will got null if corresponding ledger not exists.
      */
     CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId);
+
+    /**
+     * Truncate ledgers
+     * The latest ledger cannot be deleted ,and only delete acknowledged ledgers

Review comment:
       You are right. The description has been changed to ‘The truncate operation will move all cursors to the end of the topic and delete all inactive ledgers.’




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jangwind commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
jangwind commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r629345400



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2156,6 +2157,14 @@ private void scheduleDeferredTrimming(CompletableFuture<?> promise) {
         scheduledExecutor.schedule(safeRun(() -> trimConsumedLedgersInBackground(promise)), 100, TimeUnit.MILLISECONDS);

Review comment:
       I reviewed the code. This function is private and unused in the ManageLedgerImpl Class. So It should be removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r620137588



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -3738,6 +3751,35 @@ public void setEntriesAddedCounter(long count) {
 
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
+    @Override
+    public CompletableFuture<Void> asyncTruncate() {
+
+        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+        for (ManagedCursor cursor : cursors) {
+            final CompletableFuture<Void> future = new CompletableFuture<>();
+            cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback() {

Review comment:
       The original request is to delete retained message, retained message is acknowledged messages, messages in backlog is unacknowledged message.
   There are dedicated commands to clear whole backlog or expire certain messages in backlog, for this command we should just delete as much as retained message(messages already consumed and acknowledge by all consumers) so we shouldn't clear backlog here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r620142722



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2156,6 +2157,14 @@ private void scheduleDeferredTrimming(CompletableFuture<?> promise) {
         scheduledExecutor.schedule(safeRun(() -> trimConsumedLedgersInBackground(promise)), 100, TimeUnit.MILLISECONDS);

Review comment:
       are we still using this method? if not can just remove it.

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2316,10 +2329,10 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) {
                     log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name,
                             ls.getLedgerId());
                     break;
-                } else if (expired) {
+                } else if (expired || isTruncate) {
                     log.debug("[{}] Ledger {} has expired, ts {}", name, ls.getLedgerId(), ls.getTimestamp());

Review comment:
       +1 please update log info.

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -3738,6 +3751,35 @@ public void setEntriesAddedCounter(long count) {
 
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
+    @Override
+    public CompletableFuture<Void> asyncTruncate() {
+
+        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+        for (ManagedCursor cursor : cursors) {
+            final CompletableFuture<Void> future = new CompletableFuture<>();
+            cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback() {

Review comment:
       The original request is delete retained message, retained message is acknowledged message, message in backlog is unacknowledged message.
    There are dedicated command to clear whole backlog or expire certain messages in backlog, for this command we should just delete retained message so we shouldn't clear backlog here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r619753621



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
##########
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.mledger;
 
+import java.util.concurrent.CompletableFuture;

Review comment:
       Is this import necessary?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -3922,4 +3922,89 @@ protected void internalHandleResult(AsyncResponse asyncResponse,
             }
         }
     }
+
+    protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean authoritative) {
+
+        // If the topic name is a partition name, no need to get partition topic metadata again
+        if (topicName.isPartitioned()) {
+            Topic topic;
+            try {
+                validateAdminAccessForTenant(topicName.getTenant());
+                validateTopicOwnership(topicName, authoritative);
+                topic = getTopicReference(topicName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
+            CompletableFuture<Void> future = topic.truncate();
+            future.thenAccept(a -> {
+                asyncResponse.resume(new RestException(Response.Status.NO_CONTENT.getStatusCode(),
+                        Response.Status.NO_CONTENT.getReasonPhrase()));
+            }).exceptionally(e -> {
+                asyncResponse.resume(e);
+                return null;
+            });
+        } else {
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false).whenComplete((meta, t) -> {
+                if (meta.partitions > 0) {
+                    final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+                    for (int i = 0; i < meta.partitions; i++) {
+                        TopicName topicNamePartition = topicName.getPartition(i);
+                        try {
+                            futures.add(pulsar().getAdminClient().topics()
+                                .truncateAsync(topicNamePartition.toString()));
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to truncate topic {}", clientAppId(), topicNamePartition, e);
+                            asyncResponse.resume(new RestException(e));
+                            return;
+                        }
+                    }
+                    FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                        if (exception != null) {
+                            Throwable th = exception.getCause();
+                            if (th instanceof NotFoundException) {
+                                asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage()));
+                            } else if (th instanceof WebApplicationException) {
+                                asyncResponse.resume(th);
+                            } else {
+                                log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, exception);
+                                asyncResponse.resume(new RestException(exception));
+                            }
+                        } else {
+                            asyncResponse.resume(Response.noContent().build());
+                        }
+                        return null;
+                    });
+                } else {
+                    Topic topic;
+                    try {
+                        validateAdminAccessForTenant(topicName.getTenant());
+                        validateTopicOwnership(topicName, authoritative);
+                        topic = getTopicReference(topicName);
+                    } catch (Exception e) {
+                        log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, e);
+                        resumeAsyncResponseExceptionally(asyncResponse, e);
+                        return;
+                    }
+                    CompletableFuture<Void> future = topic.truncate();
+                    future.thenAccept(a -> {
+                        asyncResponse.resume(new RestException(Response.Status.NO_CONTENT.getStatusCode(),
+                                Response.Status.NO_CONTENT.getReasonPhrase()));
+                    }).exceptionally(e -> {
+                        asyncResponse.resume(e);
+                        return null;
+                    });

Review comment:
       It seems that this block repeated with above.

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
##########
@@ -600,4 +600,10 @@ void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.Upd
      * will got null if corresponding ledger not exists.
      */
     CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId);
+
+    /**
+     * Truncate ledgers
+     * The latest ledger cannot be deleted ,and only delete acknowledged ledgers

Review comment:
       Only delete ledgers before the mark delete position, right?

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -3738,6 +3751,35 @@ public void setEntriesAddedCounter(long count) {
 
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
+    @Override
+    public CompletableFuture<Void> asyncTruncate() {
+
+        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+        for (ManagedCursor cursor : cursors) {
+            final CompletableFuture<Void> future = new CompletableFuture<>();
+            cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback() {

Review comment:
       Why need to clear backlog?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -3922,4 +3922,89 @@ protected void internalHandleResult(AsyncResponse asyncResponse,
             }
         }
     }
+
+    protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean authoritative) {
+
+        // If the topic name is a partition name, no need to get partition topic metadata again
+        if (topicName.isPartitioned()) {
+            Topic topic;
+            try {
+                validateAdminAccessForTenant(topicName.getTenant());
+                validateTopicOwnership(topicName, authoritative);
+                topic = getTopicReference(topicName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
+            CompletableFuture<Void> future = topic.truncate();
+            future.thenAccept(a -> {
+                asyncResponse.resume(new RestException(Response.Status.NO_CONTENT.getStatusCode(),
+                        Response.Status.NO_CONTENT.getReasonPhrase()));
+            }).exceptionally(e -> {
+                asyncResponse.resume(e);
+                return null;
+            });

Review comment:
       It seems that this block is repeated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jangwind commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
jangwind commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r619815086



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -3922,4 +3922,89 @@ protected void internalHandleResult(AsyncResponse asyncResponse,
             }
         }
     }
+
+    protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean authoritative) {
+
+        // If the topic name is a partition name, no need to get partition topic metadata again
+        if (topicName.isPartitioned()) {
+            Topic topic;
+            try {
+                validateAdminAccessForTenant(topicName.getTenant());
+                validateTopicOwnership(topicName, authoritative);
+                topic = getTopicReference(topicName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
+            CompletableFuture<Void> future = topic.truncate();
+            future.thenAccept(a -> {
+                asyncResponse.resume(new RestException(Response.Status.NO_CONTENT.getStatusCode(),
+                        Response.Status.NO_CONTENT.getReasonPhrase()));
+            }).exceptionally(e -> {
+                asyncResponse.resume(e);
+                return null;
+            });

Review comment:
       Thank you for your comment. This block is based on (topicName.isPartitioned()==true) and another is based on (meta.partitions == 0). I will merge them Into a function.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -3922,4 +3922,89 @@ protected void internalHandleResult(AsyncResponse asyncResponse,
             }
         }
     }
+
+    protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean authoritative) {
+
+        // If the topic name is a partition name, no need to get partition topic metadata again
+        if (topicName.isPartitioned()) {
+            Topic topic;
+            try {
+                validateAdminAccessForTenant(topicName.getTenant());
+                validateTopicOwnership(topicName, authoritative);
+                topic = getTopicReference(topicName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
+            CompletableFuture<Void> future = topic.truncate();
+            future.thenAccept(a -> {
+                asyncResponse.resume(new RestException(Response.Status.NO_CONTENT.getStatusCode(),
+                        Response.Status.NO_CONTENT.getReasonPhrase()));
+            }).exceptionally(e -> {
+                asyncResponse.resume(e);
+                return null;
+            });
+        } else {
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false).whenComplete((meta, t) -> {
+                if (meta.partitions > 0) {
+                    final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+                    for (int i = 0; i < meta.partitions; i++) {
+                        TopicName topicNamePartition = topicName.getPartition(i);
+                        try {
+                            futures.add(pulsar().getAdminClient().topics()
+                                .truncateAsync(topicNamePartition.toString()));
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to truncate topic {}", clientAppId(), topicNamePartition, e);
+                            asyncResponse.resume(new RestException(e));
+                            return;
+                        }
+                    }
+                    FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                        if (exception != null) {
+                            Throwable th = exception.getCause();
+                            if (th instanceof NotFoundException) {
+                                asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage()));
+                            } else if (th instanceof WebApplicationException) {
+                                asyncResponse.resume(th);
+                            } else {
+                                log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, exception);
+                                asyncResponse.resume(new RestException(exception));
+                            }
+                        } else {
+                            asyncResponse.resume(Response.noContent().build());
+                        }
+                        return null;
+                    });
+                } else {
+                    Topic topic;
+                    try {
+                        validateAdminAccessForTenant(topicName.getTenant());
+                        validateTopicOwnership(topicName, authoritative);
+                        topic = getTopicReference(topicName);
+                    } catch (Exception e) {
+                        log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, e);
+                        resumeAsyncResponseExceptionally(asyncResponse, e);
+                        return;
+                    }
+                    CompletableFuture<Void> future = topic.truncate();
+                    future.thenAccept(a -> {
+                        asyncResponse.resume(new RestException(Response.Status.NO_CONTENT.getStatusCode(),
+                                Response.Status.NO_CONTENT.getReasonPhrase()));
+                    }).exceptionally(e -> {
+                        asyncResponse.resume(e);
+                        return null;
+                    });

Review comment:
       Thank you for your comment. This block is based on (topicName.isPartitioned()==true) and another is based on (meta.partitions == 0). I will merge them Into a function.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jangwind commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
jangwind commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r619809608



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
##########
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.mledger;
 
+import java.util.concurrent.CompletableFuture;

Review comment:
       Sorry, I added this dependency in the previous modification, I will delete this dependency.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jangwind commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
jangwind commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r619406784



##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
##########
@@ -484,6 +485,19 @@ void run() throws PulsarAdminException {
         }
     }
 
+    @Parameters(commandDescription = "Truncate a topic. \n"
+            + "\t\tThe topic will be truncate, but the latest ledger cannot be deleted.")

Review comment:
       Thank you very much, I will fix it as soon as possible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jangwind commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
jangwind commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r619164231



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
##########
@@ -57,18 +57,15 @@
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
-import org.apache.pulsar.common.policies.data.AuthAction;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.policies.data.*;

Review comment:
       You're right. I've modified the configuration of IDE, and I'll fix these problems right away

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
##########
@@ -18,50 +18,17 @@
  */
 package org.apache.pulsar.broker.admin;
 
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-import com.google.common.collect.BoundType;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
+import com.google.common.collect.*;

Review comment:
       You're right. I've modified the configuration of IDE, and I'll fix these problems right away




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r630034482



##########
File path: managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
##########
@@ -2917,4 +2916,59 @@ public void testManagedLedgerRollOverIfFull() throws Exception {
         Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1);
         Assert.assertEquals(ledger.getTotalSize(), 0);
     }
+
+    @Test(timeOut = 20000)
+    public void testAsyncTruncateLedgerRetention() throws Exception {
+
+        ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setRetentionSizeInMB(50);
+        config.setRetentionTime(1, TimeUnit.DAYS);
+
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("truncate_ledger", config);
+        ManagedCursor cursor = ledger.openCursor("test-cursor");
+        ledger.addEntry("test-entry-1".getBytes(Encoding));
+        ledger.addEntry("test-entry-1".getBytes(Encoding));
+        ledger.addEntry("test-entry-1".getBytes(Encoding));
+        ledger.addEntry("test-entry-1".getBytes(Encoding));
+        ledger.addEntry("test-entry-1".getBytes(Encoding));
+
+        ledger.close();
+        ManagedLedgerImpl ledger2 = (ManagedLedgerImpl)factory.open("truncate_ledger", config);
+        ledger2.addEntry("test-entry-2".getBytes(Encoding));
+
+
+        CompletableFuture<Void> future = ledger2.asyncTruncate();
+        future.get();
+
+        assertTrue(ledger2.getLedgersInfoAsList().size() <= 1);

Review comment:
       @MarvinCai We have a min rollover time limitation, the min value we are able to set is 1min. So I think maybe `setMaxEntriesPerLedger` can't work here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r620137588



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -3738,6 +3751,35 @@ public void setEntriesAddedCounter(long count) {
 
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
+    @Override
+    public CompletableFuture<Void> asyncTruncate() {
+
+        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+        for (ManagedCursor cursor : cursors) {
+            final CompletableFuture<Void> future = new CompletableFuture<>();
+            cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback() {

Review comment:
       The original request is to delete retained message, retained messages are acknowledged messages, messages in backlog are unacknowledged messages.
   There are already dedicated commands to clear whole backlog or expire certain messages in backlog, for this command we should just delete as much as retained message(messages already consumed and acknowledge by all consumers) so we shouldn't clear backlog here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jangwind commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
jangwind commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r626635111



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2316,10 +2329,10 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) {
                     log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name,
                             ls.getLedgerId());
                     break;
-                } else if (expired) {
+                } else if (expired || isTruncate) {
                     log.debug("[{}] Ledger {} has expired, ts {}", name, ls.getLedgerId(), ls.getTimestamp());

Review comment:
       Thank you very much. I will fix this debug info.

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2316,10 +2329,10 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) {
                     log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name,
                             ls.getLedgerId());
                     break;
-                } else if (expired) {
+                } else if (expired || isTruncate) {
                     log.debug("[{}] Ledger {} has expired, ts {}", name, ls.getLedgerId(), ls.getTimestamp());
                     ledgersToDelete.add(ls);
-                } else if (overRetentionQuota) {
+                } else if (overRetentionQuota || isTruncate) {
                     log.debug("[{}] Ledger {} is over quota", name, ls.getLedgerId());

Review comment:
       Thank you very much. I will fix this debug info.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r619163436



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
##########
@@ -374,6 +375,27 @@ public void getListFromBundle(
         });
     }
 
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/truncate")
+    @ApiOperation(value = "Truncate a topic.",
+            notes = "NonPersistentTopic is not support truncate.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 412, message = "NonPersistentTopic is not support truncate")

Review comment:
       NonPersistentTopic **does** not support truncate

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
##########
@@ -1705,13 +1686,13 @@ public void testUnsubscribeOnNamespace(Integer numBundles) throws Exception {
     }
 
     private List<MessageId> publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx,
-                                                  boolean nullValue) throws Exception {
+                                                             boolean nullValue) throws Exception {
         List<MessageId> messageIds = new ArrayList<>();
         Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
-            .topic(topicName)
-            .enableBatching(false)
-            .messageRoutingMode(MessageRoutingMode.SinglePartition)
-            .create();
+                .topic(topicName)

Review comment:
       please do not reformat

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -3096,5 +3096,30 @@ public void removeSubscribeRate(@Suspended final AsyncResponse asyncResponse,
         });
     }
 
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/truncate")
+    @ApiOperation(value = "Truncate a topic.",
+            notes = "The latest ledger cannot be deleted ,and only delete acknowledged ledgers.")

Review comment:
       The latest ledger will not be deleted, and also only delete acknowledged ledgers can be deleted.

##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
##########
@@ -286,6 +287,19 @@ void run() throws PulsarAdminException {
         }
     }
 
+    @Parameters(commandDescription = "Truncate a topic. \n"
+            + "\t\tThe topic will be truncate, but the latest ledger cannot be deleted.")

Review comment:
       "The topic will be truncated, but the latest ledger cannot be deleted"

##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
##########
@@ -484,6 +485,19 @@ void run() throws PulsarAdminException {
         }
     }
 
+    @Parameters(commandDescription = "Truncate a topic. \n"
+            + "\t\tThe topic will be truncate, but the latest ledger cannot be deleted.")

Review comment:
       "The topic will be truncated, but the latest ledger cannot be deleted"

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -3922,4 +3922,66 @@ protected void internalHandleResult(AsyncResponse asyncResponse,
             }
         }
     }
+
+    protected void internalTruncateTopic(AsyncResponse asyncResponse) {
+

Review comment:
       +1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jangwind commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
jangwind commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r619406375



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
##########
@@ -374,6 +375,27 @@ public void getListFromBundle(
         });
     }
 
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/truncate")
+    @ApiOperation(value = "Truncate a topic.",
+            notes = "NonPersistentTopic is not support truncate.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 412, message = "NonPersistentTopic is not support truncate")

Review comment:
       Thank you very much, I will fix it as soon as possible.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -3096,5 +3096,30 @@ public void removeSubscribeRate(@Suspended final AsyncResponse asyncResponse,
         });
     }
 
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/truncate")
+    @ApiOperation(value = "Truncate a topic.",
+            notes = "The latest ledger cannot be deleted ,and only delete acknowledged ledgers.")

Review comment:
       Thank you very much, I will fix it as soon as possible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r618430645



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -3922,4 +3922,66 @@ protected void internalHandleResult(AsyncResponse asyncResponse,
             }
         }
     }
+
+    protected void internalTruncateTopic(AsyncResponse asyncResponse) {
+

Review comment:
       We should check the topic ownership first and then get the topic reference. So that we can use the ManagedLedger of the topic to truncate directly.

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
##########
@@ -57,18 +57,15 @@
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
-import org.apache.pulsar.common.policies.data.AuthAction;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.policies.data.*;

Review comment:
       Please avoid use start import

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
##########
@@ -18,50 +18,17 @@
  */
 package org.apache.pulsar.broker.admin;
 
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-import com.google.common.collect.BoundType;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
+import com.google.common.collect.*;

Review comment:
       Please avoid use start import




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r620137588



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -3738,6 +3751,35 @@ public void setEntriesAddedCounter(long count) {
 
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
+    @Override
+    public CompletableFuture<Void> asyncTruncate() {
+
+        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+        for (ManagedCursor cursor : cursors) {
+            final CompletableFuture<Void> future = new CompletableFuture<>();
+            cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback() {

Review comment:
       The original request is delete retained message, retained message is acknowledged messages, messages in backlog is unacknowledged message.
   There are dedicated commands to clear whole backlog or expire certain messages in backlog, for this command we should just delete as much as retained message(messages already consumed and acknowledge by all consumers) so we shouldn't clear backlog here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jangwind commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
jangwind commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r626638894



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2156,6 +2157,14 @@ private void scheduleDeferredTrimming(CompletableFuture<?> promise) {
         scheduledExecutor.schedule(safeRun(() -> trimConsumedLedgersInBackground(promise)), 100, TimeUnit.MILLISECONDS);

Review comment:
       Sorry, does this method is ‘scheduledExecutor.schedule()’ or 'trimConsumedLedgersInBackground'? If it's one of them, please tell me how to modify it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] tuteng commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
tuteng commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r619931462



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2316,10 +2329,10 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) {
                     log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name,
                             ls.getLedgerId());
                     break;
-                } else if (expired) {
+                } else if (expired || isTruncate) {
                     log.debug("[{}] Ledger {} has expired, ts {}", name, ls.getLedgerId(), ls.getTimestamp());
                     ledgersToDelete.add(ls);
-                } else if (overRetentionQuota) {
+                } else if (overRetentionQuota || isTruncate) {
                     log.debug("[{}] Ledger {} is over quota", name, ls.getLedgerId());

Review comment:
       When overRetentionQuota is `false` and isTruncate is `true`, it seems that the debug info needs to be updated.
   
   

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2316,10 +2329,10 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) {
                     log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name,
                             ls.getLedgerId());
                     break;
-                } else if (expired) {
+                } else if (expired || isTruncate) {
                     log.debug("[{}] Ledger {} has expired, ts {}", name, ls.getLedgerId(), ls.getTimestamp());

Review comment:
       When expired is `false` and `isTruncate` is true, it seems that the debug info needs to be updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jangwind commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
jangwind commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r619817872



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
##########
@@ -600,4 +600,10 @@ void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.Upd
      * will got null if corresponding ledger not exists.
      */
     CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId);
+
+    /**
+     * Truncate ledgers
+     * The latest ledger cannot be deleted ,and only delete acknowledged ledgers

Review comment:
       You are right. The description has been changed to ‘The truncate operation will move all cursors to the end of the topic and delete all inactive ledgers.’




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jangwind commented on a change in pull request #10326: support truncate topic

Posted by GitBox <gi...@apache.org>.
jangwind commented on a change in pull request #10326:
URL: https://github.com/apache/pulsar/pull/10326#discussion_r619817315



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -3738,6 +3751,35 @@ public void setEntriesAddedCounter(long count) {
 
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
+    @Override
+    public CompletableFuture<Void> asyncTruncate() {
+
+        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+        for (ManagedCursor cursor : cursors) {
+            final CompletableFuture<Void> future = new CompletableFuture<>();
+            cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback() {

Review comment:
       If it doesn't clearly backLog, it will be trim instead of truncate. So I do it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org