You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2020/12/11 21:40:53 UTC

[GitHub] [geode] kirklund commented on a change in pull request #5778: GEODE-7739: Address race condition in FederatingManager during listener creation

kirklund commented on a change in pull request #5778:
URL: https://github.com/apache/geode/pull/5778#discussion_r541313975



##########
File path: geode-core/src/main/java/org/apache/geode/management/internal/NotificationCacheListener.java
##########
@@ -14,101 +14,51 @@
  */
 package org.apache.geode.management.internal;
 
-
 import javax.management.Notification;
 
-import org.apache.geode.cache.CacheListener;
+import org.apache.geode.CancelCriterion;
 import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.internal.util.concurrent.StoppableCountDownLatch;
 
 /**
  * This listener will be attached to each notification region corresponding to a member
- *
  */
-public class NotificationCacheListener implements CacheListener<NotificationKey, Notification> {
-
-  /**
-   * For the
-   */
-  private NotificationHubClient notifClient;
+public class NotificationCacheListener extends CacheListenerAdapter<NotificationKey, Notification> {
 
-  private volatile boolean readyForEvents;
+  private final NotificationHubClient notifClient;
+  private final StoppableCountDownLatch readyForEvents;
 
-  public NotificationCacheListener(MBeanProxyFactory proxyHelper) {
-
-    notifClient = new NotificationHubClient(proxyHelper);
-    this.readyForEvents = false;
+  public NotificationCacheListener(MBeanProxyFactory proxyHelper, CancelCriterion cancelCriterion) {
+    notifClient =
+        new NotificationHubClient(proxyHelper);
 
+    this.readyForEvents =
+        new StoppableCountDownLatch(cancelCriterion, 1);
   }
 
   @Override
   public void afterCreate(EntryEvent<NotificationKey, Notification> event) {
-    if (!readyForEvents) {
-      return;
-    }
+    blockUntilReady();
     notifClient.sendNotification(event);
-
-  }
-
-  @Override
-  public void afterDestroy(EntryEvent<NotificationKey, Notification> event) {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void afterInvalidate(EntryEvent<NotificationKey, Notification> event) {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void afterRegionClear(RegionEvent<NotificationKey, Notification> event) {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void afterRegionCreate(RegionEvent<NotificationKey, Notification> event) {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void afterRegionDestroy(RegionEvent<NotificationKey, Notification> event) {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void afterRegionInvalidate(RegionEvent<NotificationKey, Notification> event) {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void afterRegionLive(RegionEvent<NotificationKey, Notification> event) {
-    // TODO Auto-generated method stub
-
   }
 
   @Override
   public void afterUpdate(EntryEvent<NotificationKey, Notification> event) {
-    if (!readyForEvents) {
-      return;
-    }
+    blockUntilReady();
     notifClient.sendNotification(event);
-
   }
 
-  @Override
-  public void close() {
-    // TODO Auto-generated method stub
-
+  private void blockUntilReady() {
+    try {
+      readyForEvents.await();
+    } catch (InterruptedException e) {
+      Thread.interrupted();

Review comment:
       ```
   try {
     readyForEvents.await();
   } catch (InterruptedException e) {
     Thread.currentThread().interrupt();
     throw new RuntimeException(e);
   }
   ```

##########
File path: geode-core/src/main/java/org/apache/geode/management/internal/ManagementCacheListener.java
##########
@@ -104,13 +101,19 @@ public void afterUpdate(EntryEvent<String, Object> event) {
       if (logger.isDebugEnabled()) {
         logger.debug("Proxy Update failed for {} with exception {}", objectName, e.getMessage(), e);
       }
-
     }
+  }
 
+  private void blockUntilReady() {
+    try {
+      readyForEvents.await();
+    } catch (InterruptedException e) {
+      Thread.interrupted();

Review comment:
       `Thread.interrupted()` only checks to see if the interrupt bit is set (returns boolean). You want to use `Thread.currentThread().interrupt()` like this:
   ```
   try {
     readyForEvents.await();
   } catch (InterruptedException e) {
     Thread.currentThread().interrupt();
     throw new RuntimeException(e);
   }
   ```
   

##########
File path: geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java
##########
@@ -380,8 +384,10 @@ void addMemberArtifacts(InternalDistributedMember member) {
         return;
       }
 
-      try {
+      FederatingManagerCancelCriterion cancelCriterion =

Review comment:
       Creating a new CancelCriterion instance isn't really what is needed here. You need to `getCancelCriterion` from either the Cache or the DistributedSystem and pass that into the constructor of the classes that are using it. Then if the DS or Cache are closing, the CancelCriterion will cause the FederatingManager to cancel with the cause being the Cache or DS being closed.
   
   If you look at #5728 (my PR with changes to FederatingManager), you'll see that the constructors are now passing in an instance of `CancelCriterion` which comes from `InternalDistributedSystem`:
   
   ```
   FederatingManager(ManagementResourceRepo repo, InternalDistributedSystem system,
         SystemManagementService service, InternalCache cache, StatisticsFactory statisticsFactory,
         StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory, MemberMessenger messenger,
         Supplier<ExecutorService> executorServiceSupplier) {
       this(repo, system, service, cache, statisticsFactory, statisticsClock, proxyFactory, messenger,
           system.getCancelCriterion(), executorServiceSupplier);
     }
   
     @VisibleForTesting
     FederatingManager(ManagementResourceRepo repo, InternalDistributedSystem system,
         SystemManagementService service, InternalCache cache, StatisticsFactory statisticsFactory,
         StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory, MemberMessenger messenger,
         CancelCriterion cancelCriterion, ExecutorService executorService) {
       this(repo, system, service, cache, statisticsFactory, statisticsClock, proxyFactory, messenger,
           cancelCriterion, () -> executorService);
     }
   
     @VisibleForTesting
     FederatingManager(ManagementResourceRepo repo, InternalDistributedSystem system,
         SystemManagementService service, InternalCache cache, StatisticsFactory statisticsFactory,
         StatisticsClock statisticsClock, MBeanProxyFactory proxyFactory, MemberMessenger messenger,
         CancelCriterion cancelCriterion, Supplier<ExecutorService> executorServiceSupplier) {
       super(repo, system, cache, statisticsFactory, statisticsClock);
       this.service = service;
       this.proxyFactory = proxyFactory;
       this.messenger = messenger;
       this.executorServiceSupplier = executorServiceSupplier;
       lifecycleLock = new StoppableReentrantLock(cancelCriterion);
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org