You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/05/30 04:22:51 UTC

[GitHub] [pulsar] wuzhanpeng opened a new pull request #7111: Trigger rollover when meeting maxLedgerRolloverTimeMinutes

wuzhanpeng opened a new pull request #7111:
URL: https://github.com/apache/pulsar/pull/7111


   ### Motivation
   This pull request implements a monitor thread to check if the current topic ledger meets the constraint of `managedLedgerMaxLedgerRolloverTimeMinutes` and trigger a rollover to make the configuration take effect. Another important idea here is, if we trigger a rollover, we can close the current ledger so that we can release the storage of current ledger btw. Because for some less commonly used topics, the current ledger data is likely to have expired, and the current rollover logic will only be triggered when adding a new entry. Obviously, this will result in a waste of disk space.
   
   ### Expected behaviors
   The monitor thread will be scheduled at fix time interval and the interval is set to `managedLedgerMaxLedgerRolloverTimeMinutes`. Each inspection will make two judgments at the same time, i.e. `currentLedgerEntries > 0` and `currentLedgerIsFull()`. When the number of current entry is equal to 0, it will not trigger a new rollover and we use this to reduce the ledger's creation.
   
   ### Modifications
   - The main modification took place in `ManagedLedgerImpl`
   - In addition, a check thread was added in the `BrokerService`
   
   maybe related to #6935 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #7111: Trigger rollover when meeting maxLedgerRolloverTimeMinutes

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #7111:
URL: https://github.com/apache/pulsar/pull/7111#issuecomment-638592868


   move to 2.7.0 first.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on a change in pull request #7111: Trigger rollover when meeting maxLedgerRolloverTimeMinutes

Posted by GitBox <gi...@apache.org>.
sijie commented on a change in pull request #7111:
URL: https://github.com/apache/pulsar/pull/7111#discussion_r436420743



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -277,6 +278,8 @@ public BrokerService(PulsarService pulsar) throws Exception {
             Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-publish-buffer-monitor"));
         this.consumedLedgersMonitor = Executors
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("consumed-Ledgers-monitor"));
+        this.ledgerFullMonitor =

Review comment:
       +1 @wuzhanpeng can we re-use an existing executor? Otherwise, we end up creating a lot of executors.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jiazhai commented on pull request #7111: Trigger rollover when meeting maxLedgerRolloverTimeMinutes

Posted by GitBox <gi...@apache.org>.
jiazhai commented on pull request #7111:
URL: https://github.com/apache/pulsar/pull/7111#issuecomment-638840877


   @merlimat Would you please help review it again?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on a change in pull request #7111: Trigger rollover when meeting maxLedgerRolloverTimeMinutes

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on a change in pull request #7111:
URL: https://github.com/apache/pulsar/pull/7111#discussion_r433025228



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1391,6 +1392,38 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
         }
     }
 
+    synchronized void createLedgerAfterClosed() {
+        STATE_UPDATER.set(this, State.CreatingLedger);
+        this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
+        mbean.startDataLedgerCreateOp();
+        asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
+    }
+
+    @Override
+    public void rollCurrentLedgerIfFull() {
+        log.info("[{}] Start checking if current ledger is full", name);
+        if (currentLedgerEntries > 0 && currentLedgerIsFull()) {
+            STATE_UPDATER.set(this, State.ClosingLedger);
+            currentLedger.asyncClose(new AsyncCallback.CloseCallback() {
+                @Override
+                public void closeComplete(int rc, LedgerHandle lh, Object o) {
+                    checkArgument(currentLedger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s",
+                            currentLedger.getId(),
+                            lh.getId());
+
+                    if (rc == BKException.Code.OK) {
+                        log.debug("Successfuly closed ledger {}", lh.getId());
+                    } else {
+                        log.warn("Error when closing ledger {}. Status={}", lh.getId(), BKException.getMessage(rc));
+                    }
+
+                    ledgerClosed(lh);
+                    createLedgerAfterClosed();

Review comment:
       Yes in current code logic we can stay in `closed` state until the next entry comes in, however if we choose for waiting, the last created topic ledger will be never removed beacause the trimming stratege will not remove the current ledger. In such scenario, we will maintain a lot of useless data if the topic is no longer being used. Moreover, it may cause disk problem if we keep a lot mount of discarded topics in the cluster.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on a change in pull request #7111: Trigger rollover when meeting maxLedgerRolloverTimeMinutes

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on a change in pull request #7111:
URL: https://github.com/apache/pulsar/pull/7111#discussion_r433131223



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -485,6 +489,12 @@ protected void startConsumedLedgersMonitor() {
         }
     }
 
+    protected void startLedgerFullMonitor() {
+        int interval = pulsar().getConfiguration().getManagedLedgerMaxLedgerRolloverTimeMinutes();
+        ledgerFullMonitor.scheduleAtFixedRate(safeRun(this::checkLedgerFull),

Review comment:
       I newly add a `shutdown` for the monitor when `BrokerService` is closed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #7111: Trigger rollover when meeting maxLedgerRolloverTimeMinutes

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #7111:
URL: https://github.com/apache/pulsar/pull/7111#discussion_r435663378



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1391,6 +1392,38 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
         }
     }
 
+    synchronized void createLedgerAfterClosed() {
+        STATE_UPDATER.set(this, State.CreatingLedger);
+        this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
+        mbean.startDataLedgerCreateOp();
+        asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
+    }
+
+    @Override
+    public void rollCurrentLedgerIfFull() {
+        log.info("[{}] Start checking if current ledger is full", name);
+        if (currentLedgerEntries > 0 && currentLedgerIsFull()) {
+            STATE_UPDATER.set(this, State.ClosingLedger);
+            currentLedger.asyncClose(new AsyncCallback.CloseCallback() {
+                @Override
+                public void closeComplete(int rc, LedgerHandle lh, Object o) {
+                    checkArgument(currentLedger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s",
+                            currentLedger.getId(),
+                            lh.getId());
+
+                    if (rc == BKException.Code.OK) {
+                        log.debug("Successfuly closed ledger {}", lh.getId());
+                    } else {
+                        log.warn("Error when closing ledger {}. Status={}", lh.getId(), BKException.getMessage(rc));
+                    }
+
+                    ledgerClosed(lh);
+                    createLedgerAfterClosed();

Review comment:
       Ok, I think we can change the current logic. If the current ledger is closed, we can delete it. I'm not sure is there any problems with this change. @merlimat Could you please help check this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on a change in pull request #7111: Trigger rollover when meeting maxLedgerRolloverTimeMinutes

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on a change in pull request #7111:
URL: https://github.com/apache/pulsar/pull/7111#discussion_r433583901



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
##########
@@ -472,4 +472,9 @@ void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.Set
      * @param promise
      */
     void trimConsumedLedgersInBackground(CompletableFuture<?> promise);
+
+    /**
+     * Roll current ledger if it is full
+     */
+    void rollCurrentLedgerIfFull();

Review comment:
       Now I feel a little confused. If we treat rollover as an internal task, should `consumedLedgersMonitor` or `backlogQuotaChecker` be internal tasks as well? Or is my understanding biased?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #7111: Trigger rollover when meeting maxLedgerRolloverTimeMinutes

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #7111:
URL: https://github.com/apache/pulsar/pull/7111#discussion_r433360867



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
##########
@@ -472,4 +472,9 @@ void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.Set
      * @param promise
      */
     void trimConsumedLedgersInBackground(CompletableFuture<?> promise);
+
+    /**
+     * Roll current ledger if it is full
+     */
+    void rollCurrentLedgerIfFull();

Review comment:
       I was thinking that this is a managedLedger internal task, for which the `BrokerService` shouldn't be concerned. For that it would be better to handle in the `ManagedLedgerFactoryImpl`, to go through all open managed ledger instances and check if a rollover has to be forced.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on a change in pull request #7111: Trigger rollover when meeting maxLedgerRolloverTimeMinutes

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on a change in pull request #7111:
URL: https://github.com/apache/pulsar/pull/7111#discussion_r433029733



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
##########
@@ -472,4 +472,9 @@ void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.Set
      * @param promise
      */
     void trimConsumedLedgersInBackground(CompletableFuture<?> promise);
+
+    /**
+     * Roll current ledger if it is full
+     */
+    void rollCurrentLedgerIfFull();

Review comment:
       I hope to be able to consult you here for advice, because I am not sure whether doing type casting in the `BrokerService` is a suitable implementation. Maybe I can modify with:
   ```java
   ((ManagedLedgerImpl) managedLedger).rollCurrentLedgerIfFull();
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #7111: Trigger rollover when meeting maxLedgerRolloverTimeMinutes

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #7111:
URL: https://github.com/apache/pulsar/pull/7111#discussion_r432816840



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
##########
@@ -472,4 +472,9 @@ void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.Set
      * @param promise
      */
     void trimConsumedLedgersInBackground(CompletableFuture<?> promise);
+
+    /**
+     * Roll current ledger if it is full
+     */
+    void rollCurrentLedgerIfFull();

Review comment:
       Why do we need to expose this in the interface, it should be better to keep in implementation details

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1391,6 +1392,38 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
         }
     }
 
+    synchronized void createLedgerAfterClosed() {
+        STATE_UPDATER.set(this, State.CreatingLedger);
+        this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
+        mbean.startDataLedgerCreateOp();
+        asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
+    }
+
+    @Override
+    public void rollCurrentLedgerIfFull() {
+        log.info("[{}] Start checking if current ledger is full", name);
+        if (currentLedgerEntries > 0 && currentLedgerIsFull()) {
+            STATE_UPDATER.set(this, State.ClosingLedger);
+            currentLedger.asyncClose(new AsyncCallback.CloseCallback() {
+                @Override
+                public void closeComplete(int rc, LedgerHandle lh, Object o) {
+                    checkArgument(currentLedger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s",
+                            currentLedger.getId(),
+                            lh.getId());
+
+                    if (rc == BKException.Code.OK) {
+                        log.debug("Successfuly closed ledger {}", lh.getId());
+                    } else {
+                        log.warn("Error when closing ledger {}. Status={}", lh.getId(), BKException.getMessage(rc));
+                    }
+
+                    ledgerClosed(lh);
+                    createLedgerAfterClosed();

Review comment:
       We don't need initiate the creation of a ledger at this point. We can stay in LedgerClosed state until a new write comes in.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -277,6 +278,8 @@ public BrokerService(PulsarService pulsar) throws Exception {
             Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-publish-buffer-monitor"));
         this.consumedLedgersMonitor = Executors
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("consumed-Ledgers-monitor"));
+        this.ledgerFullMonitor =

Review comment:
       This thread is not being stopped. Potentially we could also reuse an existing executor.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -485,6 +489,12 @@ protected void startConsumedLedgersMonitor() {
         }
     }
 
+    protected void startLedgerFullMonitor() {
+        int interval = pulsar().getConfiguration().getManagedLedgerMaxLedgerRolloverTimeMinutes();
+        ledgerFullMonitor.scheduleAtFixedRate(safeRun(this::checkLedgerFull),

Review comment:
       the recurring task needs to be cancelled when BrokerService is closed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #7111: Trigger rollover when meeting maxLedgerRolloverTimeMinutes

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #7111:
URL: https://github.com/apache/pulsar/pull/7111


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #7111: Trigger rollover when meeting maxLedgerRolloverTimeMinutes

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #7111:
URL: https://github.com/apache/pulsar/pull/7111#discussion_r435662052



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
##########
@@ -472,4 +472,9 @@ void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.Set
      * @param promise
      */
     void trimConsumedLedgersInBackground(CompletableFuture<?> promise);
+
+    /**
+     * Roll current ledger if it is full
+     */
+    void rollCurrentLedgerIfFull();

Review comment:
       @wuzhanpeng I think you can add an issue to track the monitor that outside the managed ledger but should be maintained by managed ledger own.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org