You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/05/04 12:25:01 UTC

[GitHub] [pulsar] 315157973 opened a new pull request, #15425: [PIP 81] Part-1 Split createNewMetadataLedger into multiple methods for reuse

315157973 opened a new pull request, #15425:
URL: https://github.com/apache/pulsar/pull/15425

   ### Motivation
   It is difficult to get CR due to so many modifications in PIP 81 #10729.
   So I split this PR into multiple sub-PRs to facilitate CR
   
   ### Modifications
   I split the logic in createNewMetadataLedger into multiple methods to facilitate subsequent reuse
   1. Put the logic of creating Ledger into `doCreateNewMetadataLedger` separately
   2. `switchToNewLedger` does not need to wrap another layer of callback, so remove redundant wrapping
   3. When persisting data fails, we need to delete the created Ledger, this part of the logic is put into `deleteLedger`
   
   ### Verifying this change
   This modification does not change any previous behavior, so no unit tests are required, but the previous unit tests must be ensured to pass
   
   ### Documentation
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [x] `no-need-doc` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-added`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on pull request #15425: [PIP 81] Part-1 Split createNewMetadataLedger into multiple methods for reuse

Posted by GitBox <gi...@apache.org>.
315157973 commented on PR #15425:
URL: https://github.com/apache/pulsar/pull/15425#issuecomment-1119680595

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15425: [PIP 81] Part-1 Split createNewMetadataLedger into multiple methods for reuse

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15425:
URL: https://github.com/apache/pulsar/pull/15425#discussion_r865470173


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2532,10 +2532,42 @@ void internalFlushPendingMarkDeletes() {
 
     void createNewMetadataLedger(final VoidCallback callback) {
         ledger.mbean.startCursorLedgerCreateOp();
+        doCreateNewMetadataLedger().thenAccept(newLedgerHandle -> {
+            MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
+            // Created the ledger, now write the last position content
+            persistPositionToLedger(newLedgerHandle, mdEntry, new VoidCallback() {
+                @Override
+                public void operationComplete() {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Persisted position {} for cursor {}", ledger.getName(),
+                                mdEntry.newPosition, name);
+                    }
+                    switchToNewLedger(newLedgerHandle, callback);
+                }
+
+                @Override
+                public void operationFailed(ManagedLedgerException exception) {
+                    log.warn("[{}] Failed to persist position {} for cursor {}", ledger.getName(),
+                            mdEntry.newPosition, name);
+
+                    deleteLedger(newLedgerHandle);
+                    callback.operationFailed(exception);
+                }
+            });
+        }).whenComplete((result, e) -> {
+            ledger.mbean.endCursorLedgerCreateOp();
+            if (e != null) {
+                callback.operationFailed(createManagedLedgerException(e));
+            }
+        });
+    }
 
+    private CompletableFuture<LedgerHandle> doCreateNewMetadataLedger() {
+        CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
         ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> {
 
             if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
+                future.complete(null);

Review Comment:
   looks like there is a risk of NPE.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2544,63 +2576,32 @@ void createNewMetadataLedger(final VoidCallback callback) {
                 if (rc != BKException.Code.OK) {
                     log.warn("[{}] Error creating ledger for cursor {}: {}", ledger.getName(), name,
                             BKException.getMessage(rc));
-                    callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc)));
+                    future.completeExceptionally(new ManagedLedgerException(BKException.getMessage(rc)));
                     return;
                 }
 
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Created ledger {} for cursor {}", ledger.getName(), lh.getId(), name);
                 }
-                // Created the ledger, now write the last position
-                // content
-                MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
-                persistPositionToLedger(lh, mdEntry, new VoidCallback() {
-                    @Override
-                    public void operationComplete() {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] Persisted position {} for cursor {}", ledger.getName(),
-                                    mdEntry.newPosition, name);
-                        }
-                        switchToNewLedger(lh, new VoidCallback() {
-                            @Override
-                            public void operationComplete() {
-                                callback.operationComplete();
-                            }
-
-                            @Override
-                            public void operationFailed(ManagedLedgerException exception) {
-                                // it means it failed to switch the newly created ledger so, it should be
-                                // deleted to prevent leak
-                                bookkeeper.asyncDeleteLedger(lh.getId(), (int rc, Object ctx) -> {
-                                    if (rc != BKException.Code.OK) {
-                                        log.warn("[{}] Failed to delete orphan ledger {}", ledger.getName(),
-                                                lh.getId());
-                                    }
-                                }, null);
-                                callback.operationFailed(exception);
-                            }
-                        });
-                    }
-
-                    @Override
-                    public void operationFailed(ManagedLedgerException exception) {
-                        log.warn("[{}] Failed to persist position {} for cursor {}", ledger.getName(),
-                                mdEntry.newPosition, name);
-
-                        ledger.mbean.startCursorLedgerDeleteOp();
-                        bookkeeper.asyncDeleteLedger(lh.getId(), new DeleteCallback() {
-                            @Override
-                            public void deleteComplete(int rc, Object ctx) {
-                                ledger.mbean.endCursorLedgerDeleteOp();
-                            }
-                        }, null);
-                        callback.operationFailed(exception);
-                    }
-                });
+                future.complete(lh);
             }));
         }, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name));
+
+        return future;
     }
 
+    private void deleteLedger(LedgerHandle ledgerHandle) {

Review Comment:
   Should we return `CompletableFuture` to make this method more reusable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15425: [PIP 81] Part-1 Split createNewMetadataLedger into multiple methods for reuse

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15425:
URL: https://github.com/apache/pulsar/pull/15425#discussion_r866557899


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2532,10 +2532,45 @@ void internalFlushPendingMarkDeletes() {
 
     void createNewMetadataLedger(final VoidCallback callback) {
         ledger.mbean.startCursorLedgerCreateOp();
+        doCreateNewMetadataLedger().thenAccept(newLedgerHandle -> {
+            if (newLedgerHandle == null) {
+                return;
+            }
+            MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
+            // Created the ledger, now write the last position content
+            persistPositionToLedger(newLedgerHandle, mdEntry, new VoidCallback() {
+                @Override
+                public void operationComplete() {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Persisted position {} for cursor {}", ledger.getName(),
+                                mdEntry.newPosition, name);
+                    }
+                    switchToNewLedger(newLedgerHandle, callback);
+                }
+
+                @Override
+                public void operationFailed(ManagedLedgerException exception) {
+                    log.warn("[{}] Failed to persist position {} for cursor {}", ledger.getName(),
+                            mdEntry.newPosition, name);
+
+                    deleteLedgerAsync(newLedgerHandle);
+                    callback.operationFailed(exception);
+                }
+            });
+        }).whenComplete((result, e) -> {
+            ledger.mbean.endCursorLedgerCreateOp();
+            if (e != null) {
+                callback.operationFailed(createManagedLedgerException(e));
+            }
+        });
+    }
 
+    private CompletableFuture<LedgerHandle> doCreateNewMetadataLedger() {
+        CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
         ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> {
 
             if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
+                future.complete(null);

Review Comment:
   Question:
   Can we return an exception future there? Because returning null directly is a bit weird to other users and prone to throw NPE.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15425: [PIP 81] Part-1 Split createNewMetadataLedger into multiple methods for reuse

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15425:
URL: https://github.com/apache/pulsar/pull/15425#discussion_r866594597


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2532,10 +2532,45 @@ void internalFlushPendingMarkDeletes() {
 
     void createNewMetadataLedger(final VoidCallback callback) {
         ledger.mbean.startCursorLedgerCreateOp();
+        doCreateNewMetadataLedger().thenAccept(newLedgerHandle -> {
+            if (newLedgerHandle == null) {
+                return;
+            }
+            MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
+            // Created the ledger, now write the last position content
+            persistPositionToLedger(newLedgerHandle, mdEntry, new VoidCallback() {
+                @Override
+                public void operationComplete() {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Persisted position {} for cursor {}", ledger.getName(),
+                                mdEntry.newPosition, name);
+                    }
+                    switchToNewLedger(newLedgerHandle, callback);
+                }
+
+                @Override
+                public void operationFailed(ManagedLedgerException exception) {
+                    log.warn("[{}] Failed to persist position {} for cursor {}", ledger.getName(),
+                            mdEntry.newPosition, name);
+
+                    deleteLedgerAsync(newLedgerHandle);
+                    callback.operationFailed(exception);
+                }
+            });
+        }).whenComplete((result, e) -> {
+            ledger.mbean.endCursorLedgerCreateOp();
+            if (e != null) {
+                callback.operationFailed(createManagedLedgerException(e));
+            }
+        });
+    }
 
+    private CompletableFuture<LedgerHandle> doCreateNewMetadataLedger() {
+        CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
         ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> {
 
             if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
+                future.complete(null);

Review Comment:
   How about moving this logic out of this method? This will make this method purer. and you don't need to judge conditions twice.
   ```java
   if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
           future.complete(null);
   	return;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #15425: [PIP 81] Part-1 Split createNewMetadataLedger into multiple methods for reuse

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #15425:
URL: https://github.com/apache/pulsar/pull/15425#discussion_r866597342


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2532,10 +2532,45 @@ void internalFlushPendingMarkDeletes() {
 
     void createNewMetadataLedger(final VoidCallback callback) {
         ledger.mbean.startCursorLedgerCreateOp();
+        doCreateNewMetadataLedger().thenAccept(newLedgerHandle -> {
+            if (newLedgerHandle == null) {
+                return;
+            }
+            MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
+            // Created the ledger, now write the last position content
+            persistPositionToLedger(newLedgerHandle, mdEntry, new VoidCallback() {
+                @Override
+                public void operationComplete() {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Persisted position {} for cursor {}", ledger.getName(),
+                                mdEntry.newPosition, name);
+                    }
+                    switchToNewLedger(newLedgerHandle, callback);
+                }
+
+                @Override
+                public void operationFailed(ManagedLedgerException exception) {
+                    log.warn("[{}] Failed to persist position {} for cursor {}", ledger.getName(),
+                            mdEntry.newPosition, name);
+
+                    deleteLedgerAsync(newLedgerHandle);
+                    callback.operationFailed(exception);
+                }
+            });
+        }).whenComplete((result, e) -> {
+            ledger.mbean.endCursorLedgerCreateOp();
+            if (e != null) {
+                callback.operationFailed(createManagedLedgerException(e));
+            }
+        });
+    }
 
+    private CompletableFuture<LedgerHandle> doCreateNewMetadataLedger() {
+        CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
         ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> {
 
             if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
+                future.complete(null);

Review Comment:
   If we move it out, it changes the previous behavior and executes the following logic even if the handle is not available:
   `ledger.getExecutor().execute`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #15425: [PIP 81] Part-1 Split createNewMetadataLedger into multiple methods for reuse

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #15425:
URL: https://github.com/apache/pulsar/pull/15425#discussion_r865903000


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2786,6 +2787,9 @@ public void operationComplete(Void result, Stat stat) {
             @Override
             public void operationFailed(MetaStoreException e) {
                 log.warn("[{}] Failed to update consumer {}", ledger.getName(), name, e);
+                // it means it failed to switch the newly created ledger so, it should be
+                // deleted to prevent leak
+                deleteLedger(lh);

Review Comment:
   Here is persistent data to metadata store failed, why need to delete the ledger? We can only delete the ledger after persistent to metadata store successfully?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2544,63 +2576,32 @@ void createNewMetadataLedger(final VoidCallback callback) {
                 if (rc != BKException.Code.OK) {
                     log.warn("[{}] Error creating ledger for cursor {}: {}", ledger.getName(), name,
                             BKException.getMessage(rc));
-                    callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc)));
+                    future.completeExceptionally(new ManagedLedgerException(BKException.getMessage(rc)));
                     return;
                 }
 
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Created ledger {} for cursor {}", ledger.getName(), lh.getId(), name);
                 }
-                // Created the ledger, now write the last position
-                // content
-                MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
-                persistPositionToLedger(lh, mdEntry, new VoidCallback() {
-                    @Override
-                    public void operationComplete() {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] Persisted position {} for cursor {}", ledger.getName(),
-                                    mdEntry.newPosition, name);
-                        }
-                        switchToNewLedger(lh, new VoidCallback() {
-                            @Override
-                            public void operationComplete() {
-                                callback.operationComplete();
-                            }
-
-                            @Override
-                            public void operationFailed(ManagedLedgerException exception) {
-                                // it means it failed to switch the newly created ledger so, it should be
-                                // deleted to prevent leak
-                                bookkeeper.asyncDeleteLedger(lh.getId(), (int rc, Object ctx) -> {
-                                    if (rc != BKException.Code.OK) {
-                                        log.warn("[{}] Failed to delete orphan ledger {}", ledger.getName(),
-                                                lh.getId());
-                                    }
-                                }, null);
-                                callback.operationFailed(exception);
-                            }
-                        });
-                    }
-
-                    @Override
-                    public void operationFailed(ManagedLedgerException exception) {
-                        log.warn("[{}] Failed to persist position {} for cursor {}", ledger.getName(),
-                                mdEntry.newPosition, name);
-
-                        ledger.mbean.startCursorLedgerDeleteOp();
-                        bookkeeper.asyncDeleteLedger(lh.getId(), new DeleteCallback() {
-                            @Override
-                            public void deleteComplete(int rc, Object ctx) {
-                                ledger.mbean.endCursorLedgerDeleteOp();
-                            }
-                        }, null);
-                        callback.operationFailed(exception);
-                    }
-                });
+                future.complete(lh);
             }));
         }, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name));
+
+        return future;
     }
 
+    private void deleteLedger(LedgerHandle ledgerHandle) {

Review Comment:
   ```suggestion
       private void deleteLedgerAsync(LedgerHandle ledgerHandle) {
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2544,63 +2576,32 @@ void createNewMetadataLedger(final VoidCallback callback) {
                 if (rc != BKException.Code.OK) {
                     log.warn("[{}] Error creating ledger for cursor {}: {}", ledger.getName(), name,
                             BKException.getMessage(rc));
-                    callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc)));
+                    future.completeExceptionally(new ManagedLedgerException(BKException.getMessage(rc)));
                     return;
                 }
 
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Created ledger {} for cursor {}", ledger.getName(), lh.getId(), name);
                 }
-                // Created the ledger, now write the last position
-                // content
-                MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
-                persistPositionToLedger(lh, mdEntry, new VoidCallback() {
-                    @Override
-                    public void operationComplete() {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] Persisted position {} for cursor {}", ledger.getName(),
-                                    mdEntry.newPosition, name);
-                        }
-                        switchToNewLedger(lh, new VoidCallback() {
-                            @Override
-                            public void operationComplete() {
-                                callback.operationComplete();
-                            }
-
-                            @Override
-                            public void operationFailed(ManagedLedgerException exception) {
-                                // it means it failed to switch the newly created ledger so, it should be
-                                // deleted to prevent leak
-                                bookkeeper.asyncDeleteLedger(lh.getId(), (int rc, Object ctx) -> {
-                                    if (rc != BKException.Code.OK) {
-                                        log.warn("[{}] Failed to delete orphan ledger {}", ledger.getName(),
-                                                lh.getId());
-                                    }
-                                }, null);
-                                callback.operationFailed(exception);
-                            }
-                        });
-                    }
-
-                    @Override
-                    public void operationFailed(ManagedLedgerException exception) {
-                        log.warn("[{}] Failed to persist position {} for cursor {}", ledger.getName(),
-                                mdEntry.newPosition, name);
-
-                        ledger.mbean.startCursorLedgerDeleteOp();
-                        bookkeeper.asyncDeleteLedger(lh.getId(), new DeleteCallback() {
-                            @Override
-                            public void deleteComplete(int rc, Object ctx) {
-                                ledger.mbean.endCursorLedgerDeleteOp();
-                            }
-                        }, null);
-                        callback.operationFailed(exception);
-                    }
-                });
+                future.complete(lh);
             }));
         }, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name));
+
+        return future;
     }
 
+    private void deleteLedger(LedgerHandle ledgerHandle) {

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #15425: [PIP 81] Part-1 Split createNewMetadataLedger into multiple methods for reuse

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #15425:
URL: https://github.com/apache/pulsar/pull/15425#discussion_r866560902


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2532,10 +2532,45 @@ void internalFlushPendingMarkDeletes() {
 
     void createNewMetadataLedger(final VoidCallback callback) {
         ledger.mbean.startCursorLedgerCreateOp();
+        doCreateNewMetadataLedger().thenAccept(newLedgerHandle -> {
+            if (newLedgerHandle == null) {
+                return;
+            }
+            MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
+            // Created the ledger, now write the last position content
+            persistPositionToLedger(newLedgerHandle, mdEntry, new VoidCallback() {
+                @Override
+                public void operationComplete() {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Persisted position {} for cursor {}", ledger.getName(),
+                                mdEntry.newPosition, name);
+                    }
+                    switchToNewLedger(newLedgerHandle, callback);
+                }
+
+                @Override
+                public void operationFailed(ManagedLedgerException exception) {
+                    log.warn("[{}] Failed to persist position {} for cursor {}", ledger.getName(),
+                            mdEntry.newPosition, name);
+
+                    deleteLedgerAsync(newLedgerHandle);
+                    callback.operationFailed(exception);
+                }
+            });
+        }).whenComplete((result, e) -> {
+            ledger.mbean.endCursorLedgerCreateOp();
+            if (e != null) {
+                callback.operationFailed(createManagedLedgerException(e));
+            }
+        });
+    }
 
+    private CompletableFuture<LedgerHandle> doCreateNewMetadataLedger() {
+        CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
         ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> {
 
             if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
+                future.complete(null);

Review Comment:
   The previous logic was that if the judgment here is true, this directly returns. I need to know whether the value is true. Therefore, null is returned. If the value is null, the metadata does not need to be persisted.
   Or, do you have any advice
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15425: [PIP 81] Part-1 Split createNewMetadataLedger into multiple methods for reuse

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15425:
URL: https://github.com/apache/pulsar/pull/15425#discussion_r866609933


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2532,10 +2532,45 @@ void internalFlushPendingMarkDeletes() {
 
     void createNewMetadataLedger(final VoidCallback callback) {
         ledger.mbean.startCursorLedgerCreateOp();
+        doCreateNewMetadataLedger().thenAccept(newLedgerHandle -> {
+            if (newLedgerHandle == null) {
+                return;
+            }
+            MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
+            // Created the ledger, now write the last position content
+            persistPositionToLedger(newLedgerHandle, mdEntry, new VoidCallback() {
+                @Override
+                public void operationComplete() {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Persisted position {} for cursor {}", ledger.getName(),
+                                mdEntry.newPosition, name);
+                    }
+                    switchToNewLedger(newLedgerHandle, callback);
+                }
+
+                @Override
+                public void operationFailed(ManagedLedgerException exception) {
+                    log.warn("[{}] Failed to persist position {} for cursor {}", ledger.getName(),
+                            mdEntry.newPosition, name);
+
+                    deleteLedgerAsync(newLedgerHandle);
+                    callback.operationFailed(exception);
+                }
+            });
+        }).whenComplete((result, e) -> {
+            ledger.mbean.endCursorLedgerCreateOp();
+            if (e != null) {
+                callback.operationFailed(createManagedLedgerException(e));
+            }
+        });
+    }
 
+    private CompletableFuture<LedgerHandle> doCreateNewMetadataLedger() {
+        CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
         ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> {
 
             if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
+                future.complete(null);

Review Comment:
   Sorry, I missed the folding code for GH. Although I think it should be possible to optimize, there are too many race conditions here. It's best not to change the logic.
   
   Very thanks for your patient answer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #15425: [PIP 81] Part-1 Split createNewMetadataLedger into multiple methods for reuse

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #15425:
URL: https://github.com/apache/pulsar/pull/15425#discussion_r866060430


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2786,6 +2787,9 @@ public void operationComplete(Void result, Stat stat) {
             @Override
             public void operationFailed(MetaStoreException e) {
                 log.warn("[{}] Failed to update consumer {}", ledger.getName(), name, e);
+                // it means it failed to switch the newly created ledger so, it should be
+                // deleted to prevent leak
+                deleteLedger(lh);

Review Comment:
   <img width="810" alt="image" src="https://user-images.githubusercontent.com/9758905/166961907-d75f2020-b890-430c-b663-a5b41ddba4e0.png">
   I removed the useless wrapper callback, so I had to add the removal logic here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15425: [PIP 81] Part-1 Split createNewMetadataLedger into multiple methods for reuse

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15425:
URL: https://github.com/apache/pulsar/pull/15425#discussion_r866590987


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2532,10 +2532,45 @@ void internalFlushPendingMarkDeletes() {
 
     void createNewMetadataLedger(final VoidCallback callback) {
         ledger.mbean.startCursorLedgerCreateOp();
+        doCreateNewMetadataLedger().thenAccept(newLedgerHandle -> {
+            if (newLedgerHandle == null) {
+                return;
+            }
+            MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
+            // Created the ledger, now write the last position content
+            persistPositionToLedger(newLedgerHandle, mdEntry, new VoidCallback() {
+                @Override
+                public void operationComplete() {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Persisted position {} for cursor {}", ledger.getName(),
+                                mdEntry.newPosition, name);
+                    }
+                    switchToNewLedger(newLedgerHandle, callback);
+                }
+
+                @Override
+                public void operationFailed(ManagedLedgerException exception) {
+                    log.warn("[{}] Failed to persist position {} for cursor {}", ledger.getName(),
+                            mdEntry.newPosition, name);
+
+                    deleteLedgerAsync(newLedgerHandle);
+                    callback.operationFailed(exception);
+                }
+            });
+        }).whenComplete((result, e) -> {
+            ledger.mbean.endCursorLedgerCreateOp();
+            if (e != null) {
+                callback.operationFailed(createManagedLedgerException(e));
+            }
+        });
+    }
 
+    private CompletableFuture<LedgerHandle> doCreateNewMetadataLedger() {
+        CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
         ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> {
 
             if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
+                future.complete(null);

Review Comment:
   How about moving this logic out of this method? This will make this method purer. and you don't need to judge conditions twice.
   
   ```java
   if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
         future.complete(null);
         return;
   }
   ```
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #15425: [PIP 81] Part-1 Split createNewMetadataLedger into multiple methods for reuse

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #15425:
URL: https://github.com/apache/pulsar/pull/15425


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org