You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/05/11 04:59:43 UTC

[GitHub] [pulsar] hangc0276 opened a new pull request #10536: fix lock manager npe when broker shutting down

hangc0276 opened a new pull request #10536:
URL: https://github.com/apache/pulsar/pull/10536


   ### Motivation
   In broker shutting down stage, it throw the following exception.
   ```
   11:31:55.887 [metadata-store-5-1] ERROR org.apache.pulsar.metadata.impl.AbstractMetadataStore - Failed to process metadata store notification
   java.util.ConcurrentModificationException: null
           at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553) ~[?:1.8.0_131]
           at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_131]
           at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[?:1.8.0_131]
           at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[?:1.8.0_131]
           at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[?:1.8.0_131]
           at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_131]
           at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[?:1.8.0_131]
           at org.apache.pulsar.metadata.coordination.impl.LockManagerImpl.handleDataNotification(LockManagerImpl.java:120) ~[pulsar-metadata-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
           at org.apache.pulsar.metadata.impl.AbstractMetadataStore.lambda$null$0(AbstractMetadataStore.java:141) ~[pulsar-metadata-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
           at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:890) ~[?:1.8.0_131]
           at org.apache.pulsar.metadata.impl.AbstractMetadataStore.lambda$receivedNotification$1(AbstractMetadataStore.java:139) ~[pulsar-metadata-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT
   ]
           at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) [?:1.8.0_131]
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_131]
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_131]
           at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.63.Final.jar:4.1.63.Final]
           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
   ```
   
   When using `locks.stream.filter()`, it get a set contains the result. Then use foreach to traverse the elements. 
   However, when the elements remove by another thread, the foreach will throw exception.
   
   ### Modification
   1. use for loop instead of stream to traverse the locks set.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10536: fix lock manager npe when broker shutting down

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10536:
URL: https://github.com/apache/pulsar/pull/10536#discussion_r629875513



##########
File path: pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
##########
@@ -115,9 +115,11 @@ private void handleSessionEvent(SessionEvent se) {
 
     private void handleDataNotification(Notification n) {
         if (n.getType() == NotificationType.Deleted) {
-            locks.stream()
-                    .filter(l -> l.getPath().equals(n.getPath()))
-                    .forEach(l -> l.lockWasInvalidated());
+            for (ResourceLockImpl<T> lock : locks) {

Review comment:
       The problem you mentioned cannot be solved simply like this. If other thread remove the thread, it will still throw `ConcurrentModificationException`.

##########
File path: pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
##########
@@ -115,9 +115,11 @@ private void handleSessionEvent(SessionEvent se) {
 
     private void handleDataNotification(Notification n) {
         if (n.getType() == NotificationType.Deleted) {
-            locks.stream()
-                    .filter(l -> l.getPath().equals(n.getPath()))
-                    .forEach(l -> l.lockWasInvalidated());
+            for (ResourceLockImpl<T> lock : locks) {

Review comment:
       The problem you mentioned cannot be solved simply like this. If other thread remove the element, it will still throw `ConcurrentModificationException`.

##########
File path: pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
##########
@@ -115,9 +115,11 @@ private void handleSessionEvent(SessionEvent se) {
 
     private void handleDataNotification(Notification n) {
         if (n.getType() == NotificationType.Deleted) {
-            locks.stream()
-                    .filter(l -> l.getPath().equals(n.getPath()))
-                    .forEach(l -> l.lockWasInvalidated());
+            for (ResourceLockImpl<T> lock : locks) {

Review comment:
       The problem you mentioned cannot be solved simply like this. If other thread removes the element concurrently, it will still throw `ConcurrentModificationException`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on a change in pull request #10536: fix lock manager npe when broker shutting down

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on a change in pull request #10536:
URL: https://github.com/apache/pulsar/pull/10536#discussion_r630273630



##########
File path: pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
##########
@@ -115,9 +115,11 @@ private void handleSessionEvent(SessionEvent se) {
 
     private void handleDataNotification(Notification n) {
         if (n.getType() == NotificationType.Deleted) {
-            locks.stream()
-                    .filter(l -> l.getPath().equals(n.getPath()))
-                    .forEach(l -> l.lockWasInvalidated());
+            for (ResourceLockImpl<T> lock : locks) {

Review comment:
       @linlinnn  Thanks for your feedback. use `CopyOnWriteArraySet` instead of `HashSet` is the simpliest way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #10536: fix lock manager npe when broker shutting down

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #10536:
URL: https://github.com/apache/pulsar/pull/10536#issuecomment-838752533


   > switching to CopyOnWriteArraySet is not enough.
   > we already have this structure guarded by blocks like this:
   > 
   > ```
   > synchronized (LockManagerImpl.this) {
   > }
   > ```
   > 
   > I suggest to simply enclose in a synchronized block the loop
   > 
   > otherwise we must refactor all of the accesses to this variable
   
   @eolivelli  if we just enclose the synchronized block for the loop, it will get dead lock when `close()` called.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on a change in pull request #10536: fix lock manager npe when broker shutting down

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on a change in pull request #10536:
URL: https://github.com/apache/pulsar/pull/10536#discussion_r629901596



##########
File path: pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
##########
@@ -115,9 +115,11 @@ private void handleSessionEvent(SessionEvent se) {
 
     private void handleDataNotification(Notification n) {
         if (n.getType() == NotificationType.Deleted) {
-            locks.stream()
-                    .filter(l -> l.getPath().equals(n.getPath()))
-                    .forEach(l -> l.lockWasInvalidated());
+            for (ResourceLockImpl<T> lock : locks) {

Review comment:
       @eolivelli @linlinnn  Thanks for your reply, i update the code, PTAL.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] dlg99 commented on a change in pull request #10536: fix lock manager npe when broker shutting down

Posted by GitBox <gi...@apache.org>.
dlg99 commented on a change in pull request #10536:
URL: https://github.com/apache/pulsar/pull/10536#discussion_r630363179



##########
File path: pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
##########
@@ -48,7 +49,7 @@
 @Slf4j
 class LockManagerImpl<T> implements LockManager<T> {
 
-    private final Set<ResourceLockImpl<T>> locks = new HashSet<>();
+    private final Set<ResourceLockImpl<T>> locks = new CopyOnWriteArraySet<>();

Review comment:
       > It is best suited for applications in which set sizes generally stay small, read-only operations vastly outnumber mutative operations, and you need to prevent interference among threads during traversal.
   > Mutative operations (add, set, remove, etc.) are expensive since they usually entail copying the entire underlying array.
   
   https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CopyOnWriteArraySet.html
   
   i am not sure about the size of the set and frequency of acquireLock() just checking that the copying semantics won't become a perf problem/result in increased GC vs the alternatives.

##########
File path: pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
##########
@@ -48,7 +49,7 @@
 @Slf4j
 class LockManagerImpl<T> implements LockManager<T> {
 
-    private final Set<ResourceLockImpl<T>> locks = new HashSet<>();
+    private final Set<ResourceLockImpl<T>> locks = new CopyOnWriteArraySet<>();

Review comment:
       Also, probably no longer need `synchronized (LockManagerImpl.this)` in `acquireLock()` and `locks = new HashSet<>(this.locks);` in `asyncClose()`
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on pull request #10536: fix lock manager npe when broker shutting down

Posted by GitBox <gi...@apache.org>.
merlimat commented on pull request #10536:
URL: https://github.com/apache/pulsar/pull/10536#issuecomment-846599662


   I already merged an alternative fix in #10680 to use ConcurrentHashMap


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10536: fix lock manager npe when broker shutting down

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10536:
URL: https://github.com/apache/pulsar/pull/10536#discussion_r629895490



##########
File path: pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
##########
@@ -115,9 +115,11 @@ private void handleSessionEvent(SessionEvent se) {
 
     private void handleDataNotification(Notification n) {
         if (n.getType() == NotificationType.Deleted) {
-            locks.stream()
-                    .filter(l -> l.getPath().equals(n.getPath()))
-                    .forEach(l -> l.lockWasInvalidated());
+            for (ResourceLockImpl<T> lock : locks) {

Review comment:
       @linlinnn is right, this code is only rewriting the loop without the stream.
   
   probably the best way is to scan the collection and create a list of locks to be processed, then you scan the list
   
   ```
   List<ResourceLockImpl<T>> invalidated = locks.stream()
                       .filter(l -> l.getPath().equals(n.getPath())).collect(Collectors.toList());
   
   invalidated.forEach(l -> l.lockWasInvalidated())
                       
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10536: fix lock manager npe when broker shutting down

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10536:
URL: https://github.com/apache/pulsar/pull/10536#discussion_r630099590



##########
File path: pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
##########
@@ -115,9 +115,11 @@ private void handleSessionEvent(SessionEvent se) {
 
     private void handleDataNotification(Notification n) {
         if (n.getType() == NotificationType.Deleted) {
-            locks.stream()
-                    .filter(l -> l.getPath().equals(n.getPath()))
-                    .forEach(l -> l.lockWasInvalidated());
+            for (ResourceLockImpl<T> lock : locks) {

Review comment:
       I think we cannot fix it in this way.
   Because if other thread removes the element during create the list, it will throw `ConcurrentModificationException` when call `forEachRemaining(action)`.
   The simpliest way is to use `CopyOnWriteArraySet` instead of `HashSet`,
   or add a lock.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on pull request #10536: fix lock manager npe when broker shutting down

Posted by GitBox <gi...@apache.org>.
merlimat commented on pull request #10536:
URL: https://github.com/apache/pulsar/pull/10536#issuecomment-839223234


   Instead of the CopyOnWriteList,  I think it would be better to synchronize `handleSessionEvent()` and `handleDataNotification()`. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10536: fix lock manager npe when broker shutting down

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10536:
URL: https://github.com/apache/pulsar/pull/10536#issuecomment-838761381


   what about setting a flag in "close" and after that time simply ignore all of the notifications ?
   
   IIUC the deadlock would be  between the thread that is executing "close" and the thread that serves the notification of "node deleted".
   but if you are closing you can invalidate the locks inside the "close" method


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #10536: fix lock manager npe when broker shutting down

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #10536:
URL: https://github.com/apache/pulsar/pull/10536#issuecomment-838591537


   > A good point was brought up by @linlinnn about the lack of thread-safety in the original implementation. I guess one solution would be to make both `handleSessionEvent` and `handleDataNotification` methods synchronized.
   
   @lhotari  simply use synchronized will cause dead lock on close, i consider other solutions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #10536: fix lock manager npe when broker shutting down

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #10536:
URL: https://github.com/apache/pulsar/pull/10536#issuecomment-846692234


   > I already merged an alternative fix in #10680 to use ConcurrentHashMap
   
   @merlimat great, i close this pr.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 closed pull request #10536: fix lock manager npe when broker shutting down

Posted by GitBox <gi...@apache.org>.
hangc0276 closed pull request #10536:
URL: https://github.com/apache/pulsar/pull/10536


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org