You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/30 06:22:56 UTC

[GitHub] [pulsar] mattisonchao opened a new pull request #14940: [fix][Transaction] Fix unhandled exception when checkTopicNsOwnership.

mattisonchao opened a new pull request #14940:
URL: https://github.com/apache/pulsar/pull/14940


   ### Motivation
   
   - When ``TransactionMetadataStoreService#handleTcClientConnect`` invoke method as bellow we omit exception.
   
   ```java
   pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
                           .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString()).thenRun(() -> {
                                 //.... omit some code
                            })
   ```
   
   - Avoid frequent context switching using thread pools.
   
   ### Modifications
   
   - Make ``checkTopicNsOwnership`` future return.
   - Use ``thenComposeAsync`` to avoid context switching.
   - Refactor some code.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   ### Documentation
   
   - [x] `no-need-doc` 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao commented on a change in pull request #14940: [fix][transaction] Fix potentially unfinished CompletableFuture.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on a change in pull request #14940:
URL: https://github.com/apache/pulsar/pull/14940#discussion_r838337547



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -163,86 +165,63 @@ public boolean test(NamespaceBundle namespaceBundle) {
     }
 
     public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tcId) {
-        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-        internalPinnedExecutor.execute(() -> {
-            if (stores.get(tcId) != null) {
-                completableFuture.complete(null);
-            } else {
-                pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
-                        .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString())
-                        .thenRun(() -> internalPinnedExecutor.execute(() -> {
-                    final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
-                            .computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1));
-                    Deque<CompletableFuture<Void>> deque = pendingConnectRequests
-                            .computeIfAbsent(tcId.getId(), (id) -> new ConcurrentLinkedDeque<>());
-                    if (tcLoadSemaphore.tryAcquire()) {
-                        // when tcLoadSemaphore.release(), this command will acquire semaphore,
-                        // so we should jude the store exist again.
-                        if (stores.get(tcId) != null) {
-                            completableFuture.complete(null);
-                        }
-
-                        openTransactionMetadataStore(tcId).thenAccept((store) -> internalPinnedExecutor.execute(() -> {
-                            stores.put(tcId, store);
-                            LOG.info("Added new transaction meta store {}", tcId);
-                            long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
-                            while (true) {
-                                // prevent thread in a busy loop.
-                                if (System.currentTimeMillis() < endTime) {
-                                    CompletableFuture<Void> future = deque.poll();
-                                    if (future != null) {
-                                        // complete queue request future
-                                        future.complete(null);
-                                    } else {
-                                        break;
-                                    }
-                                } else {
-                                    deque.clear();
-                                    break;
+        if (stores.get(tcId) != null) {

Review comment:
       Another thing:
   It looks like we don't need to try to acquire the semaphore and queue other connections once we use a single-threaded pool.
   I'm not sure about that. still wondering...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao closed pull request #14940: [fix][transaction] Remove single thread pool and fix the concurrent problem to avoid performance issues

Posted by GitBox <gi...@apache.org>.
mattisonchao closed pull request #14940:
URL: https://github.com/apache/pulsar/pull/14940


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao commented on pull request #14940: [fix][Transaction] Fix potentially unfinished CompletableFuture.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on pull request #14940:
URL: https://github.com/apache/pulsar/pull/14940#issuecomment-1082716449


   @codelipenghui @eolivelli @congbobo184 @Technoboy- 
   PTAL :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- closed pull request #14940: [fix][transaction] Fix potentially unfinished CompletableFuture.

Posted by GitBox <gi...@apache.org>.
Technoboy- closed pull request #14940:
URL: https://github.com/apache/pulsar/pull/14940


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao commented on a change in pull request #14940: [fix][transaction] Fix potentially unfinished CompletableFuture.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on a change in pull request #14940:
URL: https://github.com/apache/pulsar/pull/14940#discussion_r838390286



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -163,86 +165,63 @@ public boolean test(NamespaceBundle namespaceBundle) {
     }
 
     public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tcId) {
-        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-        internalPinnedExecutor.execute(() -> {
-            if (stores.get(tcId) != null) {
-                completableFuture.complete(null);
-            } else {
-                pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
-                        .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString())
-                        .thenRun(() -> internalPinnedExecutor.execute(() -> {
-                    final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
-                            .computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1));
-                    Deque<CompletableFuture<Void>> deque = pendingConnectRequests
-                            .computeIfAbsent(tcId.getId(), (id) -> new ConcurrentLinkedDeque<>());
-                    if (tcLoadSemaphore.tryAcquire()) {
-                        // when tcLoadSemaphore.release(), this command will acquire semaphore,
-                        // so we should jude the store exist again.
-                        if (stores.get(tcId) != null) {
-                            completableFuture.complete(null);
-                        }
-
-                        openTransactionMetadataStore(tcId).thenAccept((store) -> internalPinnedExecutor.execute(() -> {
-                            stores.put(tcId, store);
-                            LOG.info("Added new transaction meta store {}", tcId);
-                            long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
-                            while (true) {
-                                // prevent thread in a busy loop.
-                                if (System.currentTimeMillis() < endTime) {
-                                    CompletableFuture<Void> future = deque.poll();
-                                    if (future != null) {
-                                        // complete queue request future
-                                        future.complete(null);
-                                    } else {
-                                        break;
-                                    }
-                                } else {
-                                    deque.clear();
-                                    break;
+        if (stores.get(tcId) != null) {

Review comment:
       @eolivelli 
   I think I find the root cause by #13969
   I will change the code and try to remove the thread pool ( that will seriously reduce efficiency ) 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao removed a comment on pull request #14940: [fix][transaction] Fix potentially unfinished CompletableFuture.

Posted by GitBox <gi...@apache.org>.
mattisonchao removed a comment on pull request #14940:
URL: https://github.com/apache/pulsar/pull/14940#issuecomment-1082716449


   @codelipenghui @eolivelli @congbobo184 @Technoboy- 
   PTAL :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao commented on a change in pull request #14940: [fix][transaction] Fix potentially unfinished CompletableFuture.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on a change in pull request #14940:
URL: https://github.com/apache/pulsar/pull/14940#discussion_r838265065



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -163,86 +165,63 @@ public boolean test(NamespaceBundle namespaceBundle) {
     }
 
     public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tcId) {
-        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-        internalPinnedExecutor.execute(() -> {
-            if (stores.get(tcId) != null) {
-                completableFuture.complete(null);
-            } else {
-                pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
-                        .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString())
-                        .thenRun(() -> internalPinnedExecutor.execute(() -> {
-                    final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
-                            .computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1));
-                    Deque<CompletableFuture<Void>> deque = pendingConnectRequests
-                            .computeIfAbsent(tcId.getId(), (id) -> new ConcurrentLinkedDeque<>());
-                    if (tcLoadSemaphore.tryAcquire()) {
-                        // when tcLoadSemaphore.release(), this command will acquire semaphore,
-                        // so we should jude the store exist again.
-                        if (stores.get(tcId) != null) {
-                            completableFuture.complete(null);
-                        }
-
-                        openTransactionMetadataStore(tcId).thenAccept((store) -> internalPinnedExecutor.execute(() -> {
-                            stores.put(tcId, store);
-                            LOG.info("Added new transaction meta store {}", tcId);
-                            long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
-                            while (true) {
-                                // prevent thread in a busy loop.
-                                if (System.currentTimeMillis() < endTime) {
-                                    CompletableFuture<Void> future = deque.poll();
-                                    if (future != null) {
-                                        // complete queue request future
-                                        future.complete(null);
-                                    } else {
-                                        break;
-                                    }
-                                } else {
-                                    deque.clear();
-                                    break;
+        if (stores.get(tcId) != null) {

Review comment:
       Well, The second ``stores.get(tcId) != null`` check will ensure thread-safe.
   but looks make the first check move to ``pinner executor`` will be better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao commented on a change in pull request #14940: [fix][transaction] Fix potentially unfinished CompletableFuture.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on a change in pull request #14940:
URL: https://github.com/apache/pulsar/pull/14940#discussion_r838300448



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -163,86 +165,63 @@ public boolean test(NamespaceBundle namespaceBundle) {
     }
 
     public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tcId) {
-        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-        internalPinnedExecutor.execute(() -> {
-            if (stores.get(tcId) != null) {
-                completableFuture.complete(null);
-            } else {
-                pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
-                        .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString())
-                        .thenRun(() -> internalPinnedExecutor.execute(() -> {
-                    final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
-                            .computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1));
-                    Deque<CompletableFuture<Void>> deque = pendingConnectRequests
-                            .computeIfAbsent(tcId.getId(), (id) -> new ConcurrentLinkedDeque<>());
-                    if (tcLoadSemaphore.tryAcquire()) {
-                        // when tcLoadSemaphore.release(), this command will acquire semaphore,
-                        // so we should jude the store exist again.
-                        if (stores.get(tcId) != null) {
-                            completableFuture.complete(null);
-                        }
-
-                        openTransactionMetadataStore(tcId).thenAccept((store) -> internalPinnedExecutor.execute(() -> {
-                            stores.put(tcId, store);
-                            LOG.info("Added new transaction meta store {}", tcId);
-                            long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
-                            while (true) {
-                                // prevent thread in a busy loop.
-                                if (System.currentTimeMillis() < endTime) {
-                                    CompletableFuture<Void> future = deque.poll();
-                                    if (future != null) {
-                                        // complete queue request future
-                                        future.complete(null);
-                                    } else {
-                                        break;
-                                    }
-                                } else {
-                                    deque.clear();
-                                    break;
+        if (stores.get(tcId) != null) {

Review comment:
       Hi @eolivelli 
   I changed the method to run in a single thread. And remove second ``stores.get(tcId) != null`` check, Could you review it again?
   
   After deep thinking, I think creating an executor that chooses thread by TC id is better than running in a single thread. but I think I need to do it at another PR to avoid making this PR more complex.
   
   Let me know what you think! thanks~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao commented on a change in pull request #14940: [fix][transaction] Fix potentially unfinished CompletableFuture.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on a change in pull request #14940:
URL: https://github.com/apache/pulsar/pull/14940#discussion_r838300448



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -163,86 +165,63 @@ public boolean test(NamespaceBundle namespaceBundle) {
     }
 
     public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tcId) {
-        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-        internalPinnedExecutor.execute(() -> {
-            if (stores.get(tcId) != null) {
-                completableFuture.complete(null);
-            } else {
-                pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
-                        .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString())
-                        .thenRun(() -> internalPinnedExecutor.execute(() -> {
-                    final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
-                            .computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1));
-                    Deque<CompletableFuture<Void>> deque = pendingConnectRequests
-                            .computeIfAbsent(tcId.getId(), (id) -> new ConcurrentLinkedDeque<>());
-                    if (tcLoadSemaphore.tryAcquire()) {
-                        // when tcLoadSemaphore.release(), this command will acquire semaphore,
-                        // so we should jude the store exist again.
-                        if (stores.get(tcId) != null) {
-                            completableFuture.complete(null);
-                        }
-
-                        openTransactionMetadataStore(tcId).thenAccept((store) -> internalPinnedExecutor.execute(() -> {
-                            stores.put(tcId, store);
-                            LOG.info("Added new transaction meta store {}", tcId);
-                            long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
-                            while (true) {
-                                // prevent thread in a busy loop.
-                                if (System.currentTimeMillis() < endTime) {
-                                    CompletableFuture<Void> future = deque.poll();
-                                    if (future != null) {
-                                        // complete queue request future
-                                        future.complete(null);
-                                    } else {
-                                        break;
-                                    }
-                                } else {
-                                    deque.clear();
-                                    break;
+        if (stores.get(tcId) != null) {

Review comment:
       Hi @eolivelli 
   I changed the method to run in a single thread. Could you review it again?
   
   After deep thinking, I think creating an executor that chooses thread by TC id is better than running in a single thread. but I think I need to do it at another PR to avoid making this PR more complex.
   
   Let me know what you think! thanks~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #14940: [fix][transaction] Fix potentially unfinished CompletableFuture.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #14940:
URL: https://github.com/apache/pulsar/pull/14940#discussion_r838241924



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -163,86 +165,63 @@ public boolean test(NamespaceBundle namespaceBundle) {
     }
 
     public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tcId) {
-        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-        internalPinnedExecutor.execute(() -> {
-            if (stores.get(tcId) != null) {
-                completableFuture.complete(null);
-            } else {
-                pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
-                        .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString())
-                        .thenRun(() -> internalPinnedExecutor.execute(() -> {
-                    final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
-                            .computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1));
-                    Deque<CompletableFuture<Void>> deque = pendingConnectRequests
-                            .computeIfAbsent(tcId.getId(), (id) -> new ConcurrentLinkedDeque<>());
-                    if (tcLoadSemaphore.tryAcquire()) {
-                        // when tcLoadSemaphore.release(), this command will acquire semaphore,
-                        // so we should jude the store exist again.
-                        if (stores.get(tcId) != null) {
-                            completableFuture.complete(null);
-                        }
-
-                        openTransactionMetadataStore(tcId).thenAccept((store) -> internalPinnedExecutor.execute(() -> {
-                            stores.put(tcId, store);
-                            LOG.info("Added new transaction meta store {}", tcId);
-                            long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
-                            while (true) {
-                                // prevent thread in a busy loop.
-                                if (System.currentTimeMillis() < endTime) {
-                                    CompletableFuture<Void> future = deque.poll();
-                                    if (future != null) {
-                                        // complete queue request future
-                                        future.complete(null);
-                                    } else {
-                                        break;
-                                    }
-                                } else {
-                                    deque.clear();
-                                    break;
+        if (stores.get(tcId) != null) {

Review comment:
       Before this change we were accessing this variable inside the pinner executor.
   I am not sure this change is correct 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao closed pull request #14940: [fix][transaction] Remove single thread pool and fix the concurrent problem to avoid performance issues

Posted by GitBox <gi...@apache.org>.
mattisonchao closed pull request #14940:
URL: https://github.com/apache/pulsar/pull/14940


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao closed pull request #14940: [fix][transaction] Remove single thread pool and fix the concurrent problem to avoid performance issues

Posted by GitBox <gi...@apache.org>.
mattisonchao closed pull request #14940:
URL: https://github.com/apache/pulsar/pull/14940


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao commented on pull request #14940: [fix][transaction] Remove single thread pool and fix the concurrent problem to avoid performance issues

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on pull request #14940:
URL: https://github.com/apache/pulsar/pull/14940#issuecomment-1083998210


   @eolivelli  I updated this PR, could you please review it again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org