You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by sa...@apache.org on 2018/06/04 14:30:45 UTC
[geode] branch develop updated: GEODE-5252: Race in management
adapter could fail to create MXBeans. (#1993)
This is an automated email from the ASF dual-hosted git repository.
sai_boorlagadda pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new b731658 GEODE-5252: Race in management adapter could fail to create MXBeans. (#1993)
b731658 is described below
commit b73165836e765298d0cbef692737e9820021219a
Author: Sai Boorlagadda <sa...@gmail.com>
AuthorDate: Mon Jun 4 07:29:21 2018 -0700
GEODE-5252: Race in management adapter could fail to create MXBeans. (#1993)
Fixed a race condition which causes the creation of MBeans fail
while handling resource lifecycle change notifications.
* A read-write lock is added to synchronize between handling notifications
of cache creation/removal and handling other notifications.
* Added a test which to test the synchronization for a fixed amount of time.
---
.../internal/beans/ManagementAdapter.java | 54 ++---
.../internal/beans/ManagementListener.java | 243 +++++++++++----------
.../internal/beans/ManagementAdapterTest.java | 160 ++++++++++++++
3 files changed, 318 insertions(+), 139 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
index 21bd6e6..732217e 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
@@ -135,7 +135,7 @@ public class ManagementAdapter {
*
* @param cache gemfire cache
*/
- public void handleCacheCreation(InternalCache cache) throws ManagementException {
+ protected void handleCacheCreation(InternalCache cache) throws ManagementException {
try {
this.internalCache = cache;
this.service =
@@ -193,7 +193,7 @@ public class ManagementAdapter {
/**
* Handles all the distributed mbean creation part when a Manager is started
*/
- public void handleManagerStart() throws ManagementException {
+ protected void handleManagerStart() throws ManagementException {
if (!isServiceInitialised("handleManagerStart")) {
return;
}
@@ -255,7 +255,7 @@ public class ManagementAdapter {
* Handles all the clean up activities when a Manager is stopped It clears the distributed mbeans
* and underlying data structures
*/
- public void handleManagerStop() throws ManagementException {
+ protected void handleManagerStop() throws ManagementException {
if (!isServiceInitialised("handleManagerStop")) {
return;
}
@@ -312,7 +312,7 @@ public class ManagementAdapter {
/**
* Assumption is always cache and MemberMbean has been will be created first
*/
- public void handleManagerCreation() throws ManagementException {
+ protected void handleManagerCreation() throws ManagementException {
if (!isServiceInitialised("handleManagerCreation")) {
return;
}
@@ -367,7 +367,7 @@ public class ManagementAdapter {
*
* @param disk the disk store for which the call back is invoked
*/
- public void handleDiskCreation(DiskStore disk) throws ManagementException {
+ protected void handleDiskCreation(DiskStore disk) throws ManagementException {
if (!isServiceInitialised("handleDiskCreation")) {
return;
}
@@ -390,7 +390,7 @@ public class ManagementAdapter {
* Handles LockService Creation
*
*/
- public void handleLockServiceCreation(DLockService lockService) throws ManagementException {
+ protected void handleLockServiceCreation(DLockService lockService) throws ManagementException {
if (!isServiceInitialised("handleLockServiceCreation")) {
return;
}
@@ -422,7 +422,7 @@ public class ManagementAdapter {
*
* @param sender the specific gateway sender
*/
- public void handleGatewaySenderCreation(GatewaySender sender) throws ManagementException {
+ protected void handleGatewaySenderCreation(GatewaySender sender) throws ManagementException {
if (!isServiceInitialised("handleGatewaySenderCreation")) {
return;
}
@@ -447,7 +447,7 @@ public class ManagementAdapter {
*
* @param recv specific gateway receiver
*/
- public void handleGatewayReceiverCreate(GatewayReceiver recv) throws ManagementException {
+ protected void handleGatewayReceiverCreate(GatewayReceiver recv) throws ManagementException {
if (!isServiceInitialised("handleGatewayReceiverCreate")) {
return;
}
@@ -480,7 +480,7 @@ public class ManagementAdapter {
*
* @param recv specific gateway receiver
*/
- public void handleGatewayReceiverDestroy(GatewayReceiver recv) throws ManagementException {
+ protected void handleGatewayReceiverDestroy(GatewayReceiver recv) throws ManagementException {
if (!isServiceInitialised("handleGatewayReceiverDestroy")) {
return;
}
@@ -504,7 +504,7 @@ public class ManagementAdapter {
*
* @param recv specific gateway receiver
*/
- public void handleGatewayReceiverStart(GatewayReceiver recv) throws ManagementException {
+ protected void handleGatewayReceiverStart(GatewayReceiver recv) throws ManagementException {
if (!isServiceInitialised("handleGatewayReceiverStart")) {
return;
}
@@ -529,7 +529,7 @@ public class ManagementAdapter {
*
* @param recv specific gateway receiver
*/
- public void handleGatewayReceiverStop(GatewayReceiver recv) throws ManagementException {
+ protected void handleGatewayReceiverStop(GatewayReceiver recv) throws ManagementException {
if (!isServiceInitialised("handleGatewayReceiverStop")) {
return;
}
@@ -544,7 +544,7 @@ public class ManagementAdapter {
memberLevelNotifEmitter.sendNotification(notification);
}
- public void handleAsyncEventQueueCreation(AsyncEventQueue queue) throws ManagementException {
+ protected void handleAsyncEventQueueCreation(AsyncEventQueue queue) throws ManagementException {
if (!isServiceInitialised("handleAsyncEventQueueCreation")) {
return;
}
@@ -568,7 +568,7 @@ public class ManagementAdapter {
*
* @param queue The AsyncEventQueue being removed
*/
- public void handleAsyncEventQueueRemoval(AsyncEventQueue queue) throws ManagementException {
+ protected void handleAsyncEventQueueRemoval(AsyncEventQueue queue) throws ManagementException {
if (!isServiceInitialised("handleAsyncEventQueueRemoval")) {
return;
}
@@ -604,7 +604,7 @@ public class ManagementAdapter {
* particular alert level
*
*/
- public void handleSystemNotification(AlertDetails details) {
+ protected void handleSystemNotification(AlertDetails details) {
if (!isServiceInitialised("handleSystemNotification")) {
return;
}
@@ -647,7 +647,7 @@ public class ManagementAdapter {
*
* @param cacheServer cache server instance
*/
- public void handleCacheServerStart(CacheServer cacheServer) {
+ protected void handleCacheServerStart(CacheServer cacheServer) {
if (!isServiceInitialised("handleCacheServerStart")) {
return;
}
@@ -685,7 +685,7 @@ public class ManagementAdapter {
*
* @param server cache server instance
*/
- public void handleCacheServerStop(CacheServer server) {
+ protected void handleCacheServerStop(CacheServer server) {
if (!isServiceInitialised("handleCacheServerStop")) {
return;
}
@@ -718,7 +718,7 @@ public class ManagementAdapter {
*
* @param cache GemFire Cache instance. For now client cache is not supported
*/
- public void handleCacheRemoval(Cache cache) throws ManagementException {
+ protected void handleCacheRemoval(Cache cache) throws ManagementException {
if (!isServiceInitialised("handleCacheRemoval")) {
return;
}
@@ -793,7 +793,7 @@ public class ManagementAdapter {
* Handles particular region destroy or close operation it will remove the corresponding MBean
*
*/
- public void handleRegionRemoval(Region region) throws ManagementException {
+ protected void handleRegionRemoval(Region region) throws ManagementException {
if (!isServiceInitialised("handleRegionRemoval")) {
return;
}
@@ -835,7 +835,7 @@ public class ManagementAdapter {
* Handles DiskStore Removal
*
*/
- public void handleDiskRemoval(DiskStore disk) throws ManagementException {
+ protected void handleDiskRemoval(DiskStore disk) throws ManagementException {
if (!isServiceInitialised("handleDiskRemoval")) {
return;
}
@@ -873,7 +873,7 @@ public class ManagementAdapter {
*
* @param lockService lock service instance
*/
- public void handleLockServiceRemoval(DLockService lockService) throws ManagementException {
+ protected void handleLockServiceRemoval(DLockService lockService) throws ManagementException {
if (!isServiceInitialised("handleLockServiceRemoval")) {
return;
}
@@ -900,7 +900,7 @@ public class ManagementAdapter {
*
* @param locator instance of locator which is getting started
*/
- public void handleLocatorStart(Locator locator) throws ManagementException {
+ protected void handleLocatorStart(Locator locator) throws ManagementException {
if (!isServiceInitialised("handleLocatorCreation")) {
return;
}
@@ -923,7 +923,7 @@ public class ManagementAdapter {
}
- public void handleGatewaySenderStart(GatewaySender sender) throws ManagementException {
+ protected void handleGatewaySenderStart(GatewaySender sender) throws ManagementException {
if (!isServiceInitialised("handleGatewaySenderStart")) {
return;
}
@@ -942,7 +942,7 @@ public class ManagementAdapter {
memberLevelNotifEmitter.sendNotification(notification);
}
- public void handleGatewaySenderStop(GatewaySender sender) throws ManagementException {
+ protected void handleGatewaySenderStop(GatewaySender sender) throws ManagementException {
if (!isServiceInitialised("handleGatewaySenderStop")) {
return;
}
@@ -954,7 +954,7 @@ public class ManagementAdapter {
memberLevelNotifEmitter.sendNotification(notification);
}
- public void handleGatewaySenderPaused(GatewaySender sender) throws ManagementException {
+ protected void handleGatewaySenderPaused(GatewaySender sender) throws ManagementException {
if (!isServiceInitialised("handleGatewaySenderPaused")) {
return;
}
@@ -966,7 +966,7 @@ public class ManagementAdapter {
memberLevelNotifEmitter.sendNotification(notification);
}
- public void handleGatewaySenderResumed(GatewaySender sender) throws ManagementException {
+ protected void handleGatewaySenderResumed(GatewaySender sender) throws ManagementException {
if (!isServiceInitialised("handleGatewaySenderResumed")) {
return;
}
@@ -978,7 +978,7 @@ public class ManagementAdapter {
memberLevelNotifEmitter.sendNotification(notification);
}
- public void handleGatewaySenderRemoved(GatewaySender sender) throws ManagementException {
+ protected void handleGatewaySenderRemoved(GatewaySender sender) throws ManagementException {
if (!isServiceInitialised("handleGatewaySenderRemoved")) {
return;
}
@@ -1000,7 +1000,7 @@ public class ManagementAdapter {
memberLevelNotifEmitter.sendNotification(notification);
}
- public void handleCacheServiceCreation(CacheService cacheService) throws ManagementException {
+ protected void handleCacheServiceCreation(CacheService cacheService) throws ManagementException {
if (!isServiceInitialised("handleCacheServiceCreation")) {
return;
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
index 5c2bcd7..17d1951 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
@@ -14,6 +14,8 @@
*/
package org.apache.geode.management.internal.beans;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
@@ -45,6 +47,10 @@ public class ManagementListener implements ResourceEventsListener {
private LogWriterI18n logger;
+ // having a readwrite lock to synchronize between handling cache creation/removal vs handling
+ // other notifications
+ private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
/**
* Constructor
*/
@@ -100,118 +106,131 @@ public class ManagementListener implements ResourceEventsListener {
if (!shouldProceed(event)) {
return;
}
- switch (event) {
- case CACHE_CREATE:
- InternalCache createdCache = (InternalCache) resource;
- adapter.handleCacheCreation(createdCache);
- break;
- case CACHE_REMOVE:
- InternalCache removedCache = (InternalCache) resource;
- adapter.handleCacheRemoval(removedCache);
- break;
- case REGION_CREATE:
- Region createdRegion = (Region) resource;
- adapter.handleRegionCreation(createdRegion);
- break;
- case REGION_REMOVE:
- Region removedRegion = (Region) resource;
- adapter.handleRegionRemoval(removedRegion);
- break;
- case DISKSTORE_CREATE:
- DiskStore createdDisk = (DiskStore) resource;
- adapter.handleDiskCreation(createdDisk);
- break;
- case DISKSTORE_REMOVE:
- DiskStore removedDisk = (DiskStore) resource;
- adapter.handleDiskRemoval(removedDisk);
- break;
- case GATEWAYRECEIVER_CREATE:
- GatewayReceiver createdRecv = (GatewayReceiver) resource;
- adapter.handleGatewayReceiverCreate(createdRecv);
- break;
- case GATEWAYRECEIVER_DESTROY:
- GatewayReceiver destroyedRecv = (GatewayReceiver) resource;
- adapter.handleGatewayReceiverDestroy(destroyedRecv);
- break;
- case GATEWAYRECEIVER_START:
- GatewayReceiver startedRecv = (GatewayReceiver) resource;
- adapter.handleGatewayReceiverStart(startedRecv);
- break;
- case GATEWAYRECEIVER_STOP:
- GatewayReceiver stoppededRecv = (GatewayReceiver) resource;
- adapter.handleGatewayReceiverStop(stoppededRecv);
- break;
- case GATEWAYSENDER_CREATE:
- GatewaySender sender = (GatewaySender) resource;
- adapter.handleGatewaySenderCreation(sender);
- break;
- case GATEWAYSENDER_START:
- GatewaySender startedSender = (GatewaySender) resource;
- adapter.handleGatewaySenderStart(startedSender);
- break;
- case GATEWAYSENDER_STOP:
- GatewaySender stoppedSender = (GatewaySender) resource;
- adapter.handleGatewaySenderStop(stoppedSender);
- break;
- case GATEWAYSENDER_PAUSE:
- GatewaySender pausedSender = (GatewaySender) resource;
- adapter.handleGatewaySenderPaused(pausedSender);
- break;
- case GATEWAYSENDER_RESUME:
- GatewaySender resumedSender = (GatewaySender) resource;
- adapter.handleGatewaySenderResumed(resumedSender);
- break;
- case GATEWAYSENDER_REMOVE:
- GatewaySender removedSender = (GatewaySender) resource;
- adapter.handleGatewaySenderRemoved(removedSender);
- break;
- case LOCKSERVICE_CREATE:
- DLockService createdLockService = (DLockService) resource;
- adapter.handleLockServiceCreation(createdLockService);
- break;
- case LOCKSERVICE_REMOVE:
- DLockService removedLockService = (DLockService) resource;
- adapter.handleLockServiceRemoval(removedLockService);
- break;
- case MANAGER_CREATE:
- adapter.handleManagerCreation();
- break;
- case MANAGER_START:
- adapter.handleManagerStart();
- break;
- case MANAGER_STOP:
- adapter.handleManagerStop();
- break;
- case ASYNCEVENTQUEUE_CREATE:
- AsyncEventQueue queue = (AsyncEventQueue) resource;
- adapter.handleAsyncEventQueueCreation(queue);
- break;
- case ASYNCEVENTQUEUE_REMOVE:
- AsyncEventQueue removedQueue = (AsyncEventQueue) resource;
- adapter.handleAsyncEventQueueRemoval(removedQueue);
- break;
- case SYSTEM_ALERT:
- AlertDetails details = (AlertDetails) resource;
- adapter.handleSystemNotification(details);
- break;
- case CACHE_SERVER_START:
- CacheServer startedServer = (CacheServer) resource;
- adapter.handleCacheServerStart(startedServer);
- break;
- case CACHE_SERVER_STOP:
- CacheServer stoppedServer = (CacheServer) resource;
- adapter.handleCacheServerStop(stoppedServer);
- break;
- case LOCATOR_START:
- Locator loc = (Locator) resource;
- adapter.handleLocatorStart(loc);
- break;
- case CACHE_SERVICE_CREATE:
- CacheService service = (CacheService) resource;
- adapter.handleCacheServiceCreation(service);
- break;
- default:
- break;
+ try {
+ if (event == ResourceEvent.CACHE_CREATE || event == ResourceEvent.CACHE_REMOVE) {
+ readWriteLock.writeLock().lock();
+ } else {
+ readWriteLock.readLock().lock();
+ }
+ switch (event) {
+ case CACHE_CREATE:
+ InternalCache createdCache = (InternalCache) resource;
+ adapter.handleCacheCreation(createdCache);
+ break;
+ case CACHE_REMOVE:
+ InternalCache removedCache = (InternalCache) resource;
+ adapter.handleCacheRemoval(removedCache);
+ break;
+ case REGION_CREATE:
+ Region createdRegion = (Region) resource;
+ adapter.handleRegionCreation(createdRegion);
+ break;
+ case REGION_REMOVE:
+ Region removedRegion = (Region) resource;
+ adapter.handleRegionRemoval(removedRegion);
+ break;
+ case DISKSTORE_CREATE:
+ DiskStore createdDisk = (DiskStore) resource;
+ adapter.handleDiskCreation(createdDisk);
+ break;
+ case DISKSTORE_REMOVE:
+ DiskStore removedDisk = (DiskStore) resource;
+ adapter.handleDiskRemoval(removedDisk);
+ break;
+ case GATEWAYRECEIVER_CREATE:
+ GatewayReceiver createdRecv = (GatewayReceiver) resource;
+ adapter.handleGatewayReceiverCreate(createdRecv);
+ break;
+ case GATEWAYRECEIVER_DESTROY:
+ GatewayReceiver destroyedRecv = (GatewayReceiver) resource;
+ adapter.handleGatewayReceiverDestroy(destroyedRecv);
+ break;
+ case GATEWAYRECEIVER_START:
+ GatewayReceiver startedRecv = (GatewayReceiver) resource;
+ adapter.handleGatewayReceiverStart(startedRecv);
+ break;
+ case GATEWAYRECEIVER_STOP:
+ GatewayReceiver stoppededRecv = (GatewayReceiver) resource;
+ adapter.handleGatewayReceiverStop(stoppededRecv);
+ break;
+ case GATEWAYSENDER_CREATE:
+ GatewaySender sender = (GatewaySender) resource;
+ adapter.handleGatewaySenderCreation(sender);
+ break;
+ case GATEWAYSENDER_START:
+ GatewaySender startedSender = (GatewaySender) resource;
+ adapter.handleGatewaySenderStart(startedSender);
+ break;
+ case GATEWAYSENDER_STOP:
+ GatewaySender stoppedSender = (GatewaySender) resource;
+ adapter.handleGatewaySenderStop(stoppedSender);
+ break;
+ case GATEWAYSENDER_PAUSE:
+ GatewaySender pausedSender = (GatewaySender) resource;
+ adapter.handleGatewaySenderPaused(pausedSender);
+ break;
+ case GATEWAYSENDER_RESUME:
+ GatewaySender resumedSender = (GatewaySender) resource;
+ adapter.handleGatewaySenderResumed(resumedSender);
+ break;
+ case GATEWAYSENDER_REMOVE:
+ GatewaySender removedSender = (GatewaySender) resource;
+ adapter.handleGatewaySenderRemoved(removedSender);
+ break;
+ case LOCKSERVICE_CREATE:
+ DLockService createdLockService = (DLockService) resource;
+ adapter.handleLockServiceCreation(createdLockService);
+ break;
+ case LOCKSERVICE_REMOVE:
+ DLockService removedLockService = (DLockService) resource;
+ adapter.handleLockServiceRemoval(removedLockService);
+ break;
+ case MANAGER_CREATE:
+ adapter.handleManagerCreation();
+ break;
+ case MANAGER_START:
+ adapter.handleManagerStart();
+ break;
+ case MANAGER_STOP:
+ adapter.handleManagerStop();
+ break;
+ case ASYNCEVENTQUEUE_CREATE:
+ AsyncEventQueue queue = (AsyncEventQueue) resource;
+ adapter.handleAsyncEventQueueCreation(queue);
+ break;
+ case ASYNCEVENTQUEUE_REMOVE:
+ AsyncEventQueue removedQueue = (AsyncEventQueue) resource;
+ adapter.handleAsyncEventQueueRemoval(removedQueue);
+ break;
+ case SYSTEM_ALERT:
+ AlertDetails details = (AlertDetails) resource;
+ adapter.handleSystemNotification(details);
+ break;
+ case CACHE_SERVER_START:
+ CacheServer startedServer = (CacheServer) resource;
+ adapter.handleCacheServerStart(startedServer);
+ break;
+ case CACHE_SERVER_STOP:
+ CacheServer stoppedServer = (CacheServer) resource;
+ adapter.handleCacheServerStop(stoppedServer);
+ break;
+ case LOCATOR_START:
+ Locator loc = (Locator) resource;
+ adapter.handleLocatorStart(loc);
+ break;
+ case CACHE_SERVICE_CREATE:
+ CacheService service = (CacheService) resource;
+ adapter.handleCacheServiceCreation(service);
+ break;
+ default:
+ break;
+ }
+ } finally {
+ if (event == ResourceEvent.CACHE_CREATE || event == ResourceEvent.CACHE_REMOVE) {
+ readWriteLock.writeLock().unlock();
+ } else {
+ readWriteLock.readLock().unlock();
+ }
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/beans/ManagementAdapterTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/beans/ManagementAdapterTest.java
new file mode 100644
index 0000000..c44473c
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/beans/ManagementAdapterTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.beans;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Scanner;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.distributed.internal.ResourceEvent;
+import org.apache.geode.internal.cache.DiskStoreImpl;
+import org.apache.geode.internal.cache.DiskStoreStats;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.apache.geode.test.junit.rules.ServerStarterRule;
+
+@Category(IntegrationTest.class)
+public class ManagementAdapterTest {
+
+ private InternalCache cache = null;
+ private DiskStoreImpl diskStore = mock(DiskStoreImpl.class);
+ private volatile boolean race = false;
+
+ @Rule
+ public ServerStarterRule serverRule =
+ new ServerStarterRule().withWorkingDir().withLogFile().withAutoStart();
+
+ @Before
+ public void before() {
+ cache = serverRule.getCache();
+ doReturn(new DiskStoreStats(cache.getInternalDistributedSystem(), "disk-stats")).when(diskStore)
+ .getStats();
+ doReturn(new File[] {}).when(diskStore).getDiskDirs();
+ }
+
+ @Test
+ public void testHandlingNotificationsConcurrently() throws InterruptedException {
+ /*
+ * Tests to see if there are any concurrency issues handling resource lifecycle events.
+ *
+ * There are three runnables with specific tasks as below:
+ * r1 - continuously send cache creation/removal notifications, thread modifying the state
+ * r2 - continuously send disk creation/removal, thread relying on state
+ * r3 - monitors log to see if there is a null pointer due race'
+ *
+ * Test runs at most 2 seconds or until a race.
+ */
+
+ Runnable r1 = () -> {
+ while (!race) {
+ try {
+ cache.getInternalDistributedSystem().handleResourceEvent(ResourceEvent.CACHE_REMOVE,
+ cache);
+ Thread.sleep(10);
+ cache.getInternalDistributedSystem().handleResourceEvent(ResourceEvent.CACHE_CREATE,
+ cache);
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ Runnable r2 = () -> {
+ while (!race) {
+ try {
+ cache.getInternalDistributedSystem().handleResourceEvent(ResourceEvent.DISKSTORE_CREATE,
+ diskStore);
+ Thread.sleep(5);
+ cache.getInternalDistributedSystem().handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE,
+ diskStore);
+ Thread.sleep(5);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ // r3 scans server log to see if there is null pointer due to caused by cache removal.
+ Runnable r3 = () -> {
+ while (!race) {
+ try {
+ File logFile = new File(serverRule.getWorkingDir() + "/server.log");
+ Scanner scanner = new Scanner(logFile);
+ while (scanner.hasNextLine()) {
+ final String lineFromFile = scanner.nextLine();
+ if (lineFromFile.contains("java.lang.NullPointerException")) {
+ race = true;
+ break;
+ }
+ }
+ } catch (FileNotFoundException e) {
+ // ignore this exception as the temp file might have been deleted after timeout
+ }
+ }
+ };
+
+ List<Runnable> runnables = Arrays.asList(r1, r2, r3);
+
+ final int numThreads = runnables.size();
+ final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<Throwable>());
+ final ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+ try {
+ final CountDownLatch allExecutorThreadsReady = new CountDownLatch(numThreads);
+ final CountDownLatch afterInitBlocker = new CountDownLatch(1);
+ final CountDownLatch allDone = new CountDownLatch(numThreads);
+ for (final Runnable submittedTestRunnable : runnables) {
+ threadPool.submit(() -> {
+ allExecutorThreadsReady.countDown();
+ try {
+ afterInitBlocker.await();
+ submittedTestRunnable.run();
+ } catch (final Throwable e) {
+ exceptions.add(e);
+ } finally {
+ allDone.countDown();
+ }
+ });
+ }
+ // wait until all threads are ready
+ allExecutorThreadsReady.await(runnables.size() * 10, TimeUnit.MILLISECONDS);
+ // start all test runners
+ afterInitBlocker.countDown();
+ // wait until all done or timeout
+ allDone.await(2, TimeUnit.SECONDS);
+ } finally {
+ threadPool.shutdownNow();
+ }
+ assertThat(exceptions).as("failed with exception(s)" + exceptions).isEmpty();
+ assertThat(race).as("is service to be null due to race").isEqualTo(false);
+ }
+}
--
To stop receiving notification emails like this one, please contact
sai_boorlagadda@apache.org.