You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/05/30 06:52:50 UTC

[GitHub] [pulsar] merlimat commented on a change in pull request #7111: Trigger rollover when meeting maxLedgerRolloverTimeMinutes

merlimat commented on a change in pull request #7111:
URL: https://github.com/apache/pulsar/pull/7111#discussion_r432816840



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
##########
@@ -472,4 +472,9 @@ void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.Set
      * @param promise
      */
     void trimConsumedLedgersInBackground(CompletableFuture<?> promise);
+
+    /**
+     * Roll current ledger if it is full
+     */
+    void rollCurrentLedgerIfFull();

Review comment:
       Why do we need to expose this in the interface, it should be better to keep in implementation details

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1391,6 +1392,38 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
         }
     }
 
+    synchronized void createLedgerAfterClosed() {
+        STATE_UPDATER.set(this, State.CreatingLedger);
+        this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
+        mbean.startDataLedgerCreateOp();
+        asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
+    }
+
+    @Override
+    public void rollCurrentLedgerIfFull() {
+        log.info("[{}] Start checking if current ledger is full", name);
+        if (currentLedgerEntries > 0 && currentLedgerIsFull()) {
+            STATE_UPDATER.set(this, State.ClosingLedger);
+            currentLedger.asyncClose(new AsyncCallback.CloseCallback() {
+                @Override
+                public void closeComplete(int rc, LedgerHandle lh, Object o) {
+                    checkArgument(currentLedger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s",
+                            currentLedger.getId(),
+                            lh.getId());
+
+                    if (rc == BKException.Code.OK) {
+                        log.debug("Successfuly closed ledger {}", lh.getId());
+                    } else {
+                        log.warn("Error when closing ledger {}. Status={}", lh.getId(), BKException.getMessage(rc));
+                    }
+
+                    ledgerClosed(lh);
+                    createLedgerAfterClosed();

Review comment:
       We don't need initiate the creation of a ledger at this point. We can stay in LedgerClosed state until a new write comes in.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -277,6 +278,8 @@ public BrokerService(PulsarService pulsar) throws Exception {
             Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-publish-buffer-monitor"));
         this.consumedLedgersMonitor = Executors
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("consumed-Ledgers-monitor"));
+        this.ledgerFullMonitor =

Review comment:
       This thread is not being stopped. Potentially we could also reuse an existing executor.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -485,6 +489,12 @@ protected void startConsumedLedgersMonitor() {
         }
     }
 
+    protected void startLedgerFullMonitor() {
+        int interval = pulsar().getConfiguration().getManagedLedgerMaxLedgerRolloverTimeMinutes();
+        ledgerFullMonitor.scheduleAtFixedRate(safeRun(this::checkLedgerFull),

Review comment:
       the recurring task needs to be cancelled when BrokerService is closed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org