You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2020/04/29 16:03:53 UTC
[geode] branch develop updated: GEODE-7503: Block Cache.close()
until everything is disconnected (#4963)
This is an automated email from the ASF dual-hosted git repository.
klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new c12e8cc GEODE-7503: Block Cache.close() until everything is disconnected (#4963)
c12e8cc is described below
commit c12e8cc7221ba79aa60dfdee02059b847a4c787c
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Wed Apr 29 09:03:06 2020 -0700
GEODE-7503: Block Cache.close() until everything is disconnected (#4963)
Subsequent calls to GemFireCacheImpl close will now block until the
first call to close completes instead of returning from early-outs.
Use boolean argument to skip await if cache close is invoked from
the DisconnectListener in InternalLocator.
Additional changes:
* Make cache close reentrant
* Fixup GemFireCacheImplCloseTest flakiness
* Remove unused class
---
.../internal/InternalDistributedSystem.java | 2 +-
.../distributed/internal/InternalLocator.java | 2 +-
.../geode/internal/cache/GemFireCacheImpl.java | 458 +++++++++++----------
.../apache/geode/internal/cache/InternalCache.java | 3 +-
.../cache/InternalCacheForClientAccess.java | 6 +-
.../internal/cache/xmlcache/CacheCreation.java | 2 +-
.../internal/cache/GemFireCacheImplCloseTest.java | 56 ++-
.../geode/memcached/IntegrationJUnitTest.java | 4 +-
8 files changed, 307 insertions(+), 226 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
index a6b0ae3..d45df2e 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
@@ -1556,7 +1556,7 @@ public class InternalDistributedSystem extends DistributedSystem
isDisconnectThread.set(Boolean.TRUE); // bug #42663 - this must be set while
// closing the cache
try {
- currentCache.close(reason, dm.getRootCause(), keepAlive, true); // fix for 42150
+ currentCache.close(reason, dm.getRootCause(), keepAlive, true, false);
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
index 74cda22..288e5d9 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
@@ -996,7 +996,7 @@ public class InternalLocator extends Locator implements ConnectListener, LogConf
if (internalCache != null && !stoppedForReconnect && !forcedDisconnect) {
logger.info("Closing locator's cache");
try {
- internalCache.close();
+ internalCache.close("Normal disconnect", null, false, false, true);
} catch (RuntimeException ex) {
logger.info("Could not close locator's cache because: {}", ex.getMessage(), ex);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 9b2c9db..1b6d46f 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -352,6 +352,8 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
private static final ThreadLocal<GemFireCacheImpl> xmlCache = new ThreadLocal<>();
+ private static final ThreadLocal<Thread> CLOSING_THREAD = new ThreadLocal<>();
+
/**
* System property to limit the max query-execution time. By default its turned off (-1), the time
* is set in milliseconds.
@@ -615,6 +617,8 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
private volatile boolean isClosing;
+ private final CountDownLatch isClosedLatch = new CountDownLatch(1);
+
/**
* Set of all gateway senders. It may be fetched safely (for enumeration), but updates must by
* synchronized via {@link #allGatewaySendersLock}
@@ -1764,7 +1768,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
}
}
- close("Shut down all members", null, false, true);
+ close("Shut down all members", null, false, true, false);
} finally {
shutDownAllFinished.countDown();
}
@@ -1922,17 +1926,17 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
@Override
public void close(String reason, boolean keepAlive, boolean keepDS) {
- close(reason, null, keepAlive, keepDS);
+ close(reason, null, keepAlive, keepDS, false);
}
@Override
public void close(boolean keepAlive) {
- close("Normal disconnect", null, keepAlive, false);
+ close("Normal disconnect", null, keepAlive, false, false);
}
@Override
public void close(String reason, Throwable optionalCause) {
- close(reason, optionalCause, false, false);
+ close(reason, optionalCause, false, false, false);
}
@Override
@@ -2058,10 +2062,13 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
@Override
public void close(String reason, Throwable systemFailureCause, boolean keepAlive,
- boolean keepDS) {
+ boolean keepDS, boolean skipAwait) {
securityService.close();
if (isClosed()) {
+ if (!skipAwait && !Thread.currentThread().equals(CLOSING_THREAD.get())) {
+ waitUntilClosed();
+ }
return;
}
@@ -2078,285 +2085,306 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
return;
}
- boolean isDebugEnabled = logger.isDebugEnabled();
-
synchronized (GemFireCacheImpl.class) {
// ALL CODE FOR CLOSE SHOULD NOW BE UNDER STATIC SYNCHRONIZATION OF GemFireCacheImpl.class
// static synchronization is necessary due to static resources
if (isClosed()) {
- return;
- }
-
- // First close the ManagementService
- system.handleResourceEvent(ResourceEvent.CACHE_REMOVE, this);
- if (resourceEventsListener != null) {
- system.removeResourceListener(resourceEventsListener);
- resourceEventsListener = null;
- }
-
- if (systemFailureCause != null) {
- forcedDisconnect = systemFailureCause instanceof ForcedDisconnectException;
- if (forcedDisconnect) {
- disconnectCause = new ForcedDisconnectException(reason);
- } else {
- disconnectCause = systemFailureCause;
+ if (!skipAwait && !Thread.currentThread().equals(CLOSING_THREAD.get())) {
+ waitUntilClosed();
}
+ return;
}
- this.keepAlive = keepAlive;
- isClosing = true;
- logger.info("{}: Now closing.", this);
-
- // we don't clear the prID map if there is a system failure. Other
- // threads may be hung trying to communicate with the map locked
- if (systemFailureCause == null) {
- PartitionedRegion.clearPRIdMap();
- }
-
- TXStateProxy tx = null;
+ CLOSING_THREAD.set(Thread.currentThread());
try {
- if (transactionManager != null) {
- tx = transactionManager.pauseTransaction();
- }
-
- // do this before closing regions
- resourceManager.close();
+ boolean isDebugEnabled = logger.isDebugEnabled();
- try {
- resourceAdvisor.close();
- } catch (CancelException ignore) {
- }
- try {
- jmxAdvisor.close();
- } catch (CancelException ignore) {
+ // First close the ManagementService
+ system.handleResourceEvent(ResourceEvent.CACHE_REMOVE, this);
+ if (resourceEventsListener != null) {
+ system.removeResourceListener(resourceEventsListener);
+ resourceEventsListener = null;
}
- for (GatewaySender sender : allGatewaySenders) {
- try {
- sender.stop();
- GatewaySenderAdvisor advisor = ((AbstractGatewaySender) sender).getSenderAdvisor();
- if (advisor != null) {
- if (isDebugEnabled) {
- logger.debug("Stopping the GatewaySender advisor");
- }
- advisor.close();
- }
- } catch (CancelException ignore) {
+ if (systemFailureCause != null) {
+ forcedDisconnect = systemFailureCause instanceof ForcedDisconnectException;
+ if (forcedDisconnect) {
+ disconnectCause = new ForcedDisconnectException(reason);
+ } else {
+ disconnectCause = systemFailureCause;
}
}
- destroyGatewaySenderLockService();
+ this.keepAlive = keepAlive;
+ isClosing = true;
+ logger.info("{}: Now closing.", this);
- if (eventThreadPool != null) {
- if (isDebugEnabled) {
- logger.debug("{}: stopping event thread pool...", this);
- }
- eventThreadPool.shutdown();
+ // we don't clear the prID map if there is a system failure. Other
+ // threads may be hung trying to communicate with the map locked
+ if (systemFailureCause == null) {
+ PartitionedRegion.clearPRIdMap();
}
- // IMPORTANT: any operation during shut down that can time out (create a CancelException)
- // must be inside of this try block. If all else fails, we *must* ensure that the cache gets
- // closed!
+ TXStateProxy tx = null;
try {
- stopServers();
+ if (transactionManager != null) {
+ tx = transactionManager.pauseTransaction();
+ }
- stopServices();
+ // do this before closing regions
+ resourceManager.close();
- // no need to track PR instances
- if (isDebugEnabled) {
- logger.debug("{}: clearing partitioned regions...", this);
+ try {
+ resourceAdvisor.close();
+ } catch (CancelException ignore) {
}
- synchronized (partitionedRegions) {
- int prSize = -partitionedRegions.size();
- partitionedRegions.clear();
- getCachePerfStats().incPartitionedRegions(prSize);
+ try {
+ jmxAdvisor.close();
+ } catch (CancelException ignore) {
}
- prepareDiskStoresForClose();
+ for (GatewaySender sender : allGatewaySenders) {
+ try {
+ sender.stop();
+ GatewaySenderAdvisor advisor = ((AbstractGatewaySender) sender).getSenderAdvisor();
+ if (advisor != null) {
+ if (isDebugEnabled) {
+ logger.debug("Stopping the GatewaySender advisor");
+ }
+ advisor.close();
+ }
+ } catch (CancelException ignore) {
+ }
+ }
- Operation op;
- if (forcedDisconnect) {
- op = Operation.FORCED_DISCONNECT;
- } else if (isReconnecting()) {
- op = Operation.CACHE_RECONNECT;
- } else {
- op = Operation.CACHE_CLOSE;
+ destroyGatewaySenderLockService();
+
+ if (eventThreadPool != null) {
+ if (isDebugEnabled) {
+ logger.debug("{}: stopping event thread pool...", this);
+ }
+ eventThreadPool.shutdown();
}
- InternalRegion prRoot = null;
+ // IMPORTANT: any operation during shut down that can time out (create a CancelException)
+ // must be inside of this try block. If all else fails, we *must* ensure that the cache
+ // gets
+ // closed!
+ try {
+ stopServers();
+
+ stopServices();
- for (InternalRegion lr : rootRegions.values()) {
+ // no need to track PR instances
if (isDebugEnabled) {
- logger.debug("{}: processing region {}", this, lr.getFullPath());
+ logger.debug("{}: clearing partitioned regions...", this);
}
- if (PartitionedRegionHelper.PR_ROOT_REGION_NAME.equals(lr.getName())) {
- prRoot = lr;
+ synchronized (partitionedRegions) {
+ int prSize = -partitionedRegions.size();
+ partitionedRegions.clear();
+ getCachePerfStats().incPartitionedRegions(prSize);
+ }
+
+ prepareDiskStoresForClose();
+
+ Operation op;
+ if (forcedDisconnect) {
+ op = Operation.FORCED_DISCONNECT;
+ } else if (isReconnecting()) {
+ op = Operation.CACHE_RECONNECT;
} else {
- if (lr.getName().contains(ParallelGatewaySenderQueue.QSTRING)) {
- // this region will be closed internally by parent region
- continue;
- }
+ op = Operation.CACHE_CLOSE;
+ }
+
+ InternalRegion prRoot = null;
+
+ for (InternalRegion lr : rootRegions.values()) {
if (isDebugEnabled) {
- logger.debug("{}: closing region {}...", this, lr.getFullPath());
+ logger.debug("{}: processing region {}", this, lr.getFullPath());
}
- try {
- lr.handleCacheClose(op);
- } catch (RuntimeException e) {
- if (isDebugEnabled || !forcedDisconnect) {
- logger.warn(String.format("%s: error closing region %s", this, lr.getFullPath()),
- e);
+ if (PartitionedRegionHelper.PR_ROOT_REGION_NAME.equals(lr.getName())) {
+ prRoot = lr;
+ } else {
+ if (lr.getName().contains(ParallelGatewaySenderQueue.QSTRING)) {
+ // this region will be closed internally by parent region
+ continue;
+ }
+ if (isDebugEnabled) {
+ logger.debug("{}: closing region {}...", this, lr.getFullPath());
+ }
+ try {
+ lr.handleCacheClose(op);
+ } catch (RuntimeException e) {
+ if (isDebugEnabled || !forcedDisconnect) {
+ logger
+ .warn(String.format("%s: error closing region %s", this, lr.getFullPath()),
+ e);
+ }
}
}
}
- }
- try {
- if (isDebugEnabled) {
- logger.debug("{}: finishing partitioned region close...", this);
+ try {
+ if (isDebugEnabled) {
+ logger.debug("{}: finishing partitioned region close...", this);
+ }
+ PartitionedRegion.afterRegionsClosedByCacheClose(this);
+ if (prRoot != null) {
+ // do the PR meta root region last
+ prRoot.handleCacheClose(op);
+ }
+ } catch (CancelException e) {
+ logger.warn(
+ String.format("%s: error in last stage of PartitionedRegion cache close", this),
+ e);
}
- PartitionedRegion.afterRegionsClosedByCacheClose(this);
- if (prRoot != null) {
- // do the PR meta root region last
- prRoot.handleCacheClose(op);
+ destroyPartitionedRegionLockService();
+
+ closeDiskStores();
+ diskMonitor.close();
+
+ // Close the CqService Handle.
+ try {
+ if (isDebugEnabled) {
+ logger.debug("{}: closing CQ service...", this);
+ }
+ cqService.close();
+ } catch (RuntimeException ignore) {
+ logger.info("Failed to get the CqService, to close during cache close (1).");
}
- } catch (CancelException e) {
- logger.warn(
- String.format("%s: error in last stage of PartitionedRegion cache close", this), e);
- }
- destroyPartitionedRegionLockService();
- closeDiskStores();
- diskMonitor.close();
+ PoolManager.close(keepAlive);
- // Close the CqService Handle.
- try {
if (isDebugEnabled) {
- logger.debug("{}: closing CQ service...", this);
+ logger.debug("{}: notifying admins of close...", this);
+ }
+ try {
+ SystemMemberCacheEventProcessor.send(this, Operation.CACHE_CLOSE);
+ } catch (CancelException ignore) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Ignored cancellation while notifying admins");
+ }
}
- cqService.close();
- } catch (RuntimeException ignore) {
- logger.info("Failed to get the CqService, to close during cache close (1).");
- }
- PoolManager.close(keepAlive);
+ if (isDebugEnabled) {
+ logger.debug("{}: stopping destroyed entries processor...", this);
+ }
+ tombstoneService.stop();
- if (isDebugEnabled) {
- logger.debug("{}: notifying admins of close...", this);
- }
- try {
- SystemMemberCacheEventProcessor.send(this, Operation.CACHE_CLOSE);
- } catch (CancelException ignore) {
- if (logger.isDebugEnabled()) {
- logger.debug("Ignored cancellation while notifying admins");
+ // NOTICE: the CloseCache message is the *last* message you can send!
+ DistributionManager distributionManager = null;
+ try {
+ distributionManager = system.getDistributionManager();
+ distributionManager.removeMembershipListener(transactionManager);
+ } catch (CancelException ignore) {
}
- }
- if (isDebugEnabled) {
- logger.debug("{}: stopping destroyed entries processor...", this);
+ if (distributionManager != null) {
+ // Send CacheClosedMessage (and NOTHING ELSE) here
+ if (isDebugEnabled) {
+ logger.debug("{}: sending CloseCache to peers...", this);
+ }
+ Set<InternalDistributedMember> otherMembers =
+ distributionManager.getOtherDistributionManagerIds();
+ ReplyProcessor21 processor = replyProcessor21Factory.create(system, otherMembers);
+ CloseCacheMessage msg = new CloseCacheMessage();
+ msg.setRecipients(otherMembers);
+ msg.setProcessorId(processor.getProcessorId());
+ distributionManager.putOutgoing(msg);
+
+ try {
+ processor.waitForReplies();
+ } catch (InterruptedException ignore) {
+ // TODO: reset interrupt flag later?
+ // Keep going, make best effort to shut down.
+ } catch (ReplyException ignore) {
+ // keep going
+ }
+ // set closed state after telling others and getting responses to avoid complications
+ // with others still in the process of sending messages
+ }
+ // NO MORE Distributed Messaging AFTER THIS POINT!!!!
+
+ ClientMetadataService cms = clientMetadataService;
+ if (cms != null) {
+ cms.close();
+ }
+ closeHeapEvictor();
+ closeOffHeapEvictor();
+ } catch (CancelException ignore) {
+ // make sure the disk stores get closed
+ closeDiskStores();
+ // NO DISTRIBUTED MESSAGING CAN BE DONE HERE!
}
- tombstoneService.stop();
- // NOTICE: the CloseCache message is the *last* message you can send!
- DistributionManager distributionManager = null;
+ // Close the CqService Handle.
try {
- distributionManager = system.getDistributionManager();
- distributionManager.removeMembershipListener(transactionManager);
- } catch (CancelException ignore) {
+ cqService.close();
+ } catch (RuntimeException ignore) {
+ logger.info("Failed to get the CqService, to close during cache close (2).");
}
- if (distributionManager != null) {
- // Send CacheClosedMessage (and NOTHING ELSE) here
- if (isDebugEnabled) {
- logger.debug("{}: sending CloseCache to peers...", this);
- }
- Set<InternalDistributedMember> otherMembers =
- distributionManager.getOtherDistributionManagerIds();
- ReplyProcessor21 processor = replyProcessor21Factory.create(system, otherMembers);
- CloseCacheMessage msg = new CloseCacheMessage();
- msg.setRecipients(otherMembers);
- msg.setProcessorId(processor.getProcessorId());
- distributionManager.putOutgoing(msg);
+ cachePerfStats.close();
+ TXLockService.destroyServices();
+ getEventTrackerTask().cancel();
- try {
- processor.waitForReplies();
- } catch (InterruptedException ignore) {
- // TODO: reset interrupt flag later?
- // Keep going, make best effort to shut down.
- } catch (ReplyException ignore) {
- // keep going
+ synchronized (ccpTimerMutex) {
+ if (ccpTimer != null) {
+ ccpTimer.cancel();
}
- // set closed state after telling others and getting responses to avoid complications
- // with others still in the process of sending messages
- }
- // NO MORE Distributed Messaging AFTER THIS POINT!!!!
-
- ClientMetadataService cms = clientMetadataService;
- if (cms != null) {
- cms.close();
}
- closeHeapEvictor();
- closeOffHeapEvictor();
- } catch (CancelException ignore) {
- // make sure the disk stores get closed
- closeDiskStores();
- // NO DISTRIBUTED MESSAGING CAN BE DONE HERE!
- }
- // Close the CqService Handle.
- try {
- cqService.close();
- } catch (RuntimeException ignore) {
- logger.info("Failed to get the CqService, to close during cache close (2).");
- }
+ expirationScheduler.cancel();
- cachePerfStats.close();
- TXLockService.destroyServices();
- getEventTrackerTask().cancel();
+ // Stop QueryMonitor if running.
+ if (queryMonitor != null) {
+ queryMonitor.stopMonitoring();
+ }
- synchronized (ccpTimerMutex) {
- if (ccpTimer != null) {
- ccpTimer.cancel();
+ } finally {
+ // NO DISTRIBUTED MESSAGING CAN BE DONE HERE!
+ if (transactionManager != null) {
+ transactionManager.close();
}
+ ((DynamicRegionFactoryImpl) DynamicRegionFactory.get()).close();
+ if (transactionManager != null) {
+ transactionManager.unpauseTransaction(tx);
+ }
+ TXCommitMessage.getTracker().clearForCacheClose();
}
- expirationScheduler.cancel();
+ // Added to close the TransactionManager's cleanup thread
+ TransactionManagerImpl.refresh();
- // Stop QueryMonitor if running.
- if (queryMonitor != null) {
- queryMonitor.stopMonitoring();
+ if (!keepDS) {
+ // keepDS is used by ShutdownAll. It will override disableDisconnectDsOnCacheClose
+ if (!disableDisconnectDsOnCacheClose) {
+ system.disconnect();
+ }
}
- } finally {
- // NO DISTRIBUTED MESSAGING CAN BE DONE HERE!
- if (transactionManager != null) {
- transactionManager.close();
- }
- ((DynamicRegionFactoryImpl) DynamicRegionFactory.get()).close();
- if (transactionManager != null) {
- transactionManager.unpauseTransaction(tx);
- }
- TXCommitMessage.getTracker().clearForCacheClose();
- }
+ typeRegistryClose.run();
+ typeRegistrySetPdxSerializer.accept(null);
- // Added to close the TransactionManager's cleanup thread
- TransactionManagerImpl.refresh();
-
- if (!keepDS) {
- // keepDS is used by ShutdownAll. It will override disableDisconnectDsOnCacheClose
- if (!disableDisconnectDsOnCacheClose) {
- system.disconnect();
+ for (CacheLifecycleListener listener : cacheLifecycleListeners) {
+ listener.cacheClosed(this);
}
- }
- typeRegistryClose.run();
- typeRegistrySetPdxSerializer.accept(null);
+ SequenceLoggerImpl.signalCacheClose();
+ SystemFailure.signalCacheClose();
- for (CacheLifecycleListener listener : cacheLifecycleListeners) {
- listener.cacheClosed(this);
+ isClosedLatch.countDown();
+ } finally {
+ CLOSING_THREAD.remove();
}
+ }
+ }
- SequenceLoggerImpl.signalCacheClose();
- SystemFailure.signalCacheClose();
+ private void waitUntilClosed() {
+ try {
+ isClosedLatch.await();
+ } catch (InterruptedException ignore) {
+ // ignored
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
index ca7d632..15249e2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
@@ -416,7 +416,8 @@ public interface InternalCache extends Cache, Extensible<Cache>, CacheTime, Inte
*/
QueryMonitor getQueryMonitor();
- void close(String reason, Throwable systemFailureCause, boolean keepAlive, boolean keepDS);
+ void close(String reason, Throwable systemFailureCause, boolean keepAlive, boolean keepDS,
+ boolean skipAwait);
JmxManagerAdvisor getJmxManagerAdvisor();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCacheForClientAccess.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCacheForClientAccess.java
index 6969f47..10635fe 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCacheForClientAccess.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCacheForClientAccess.java
@@ -1020,9 +1020,9 @@ public class InternalCacheForClientAccess implements InternalCache {
}
@Override
- public void close(String reason, Throwable systemFailureCause, boolean keepAlive,
- boolean keepDS) {
- delegate.close(reason, systemFailureCause, keepAlive, keepDS);
+ public void close(String reason, Throwable systemFailureCause, boolean keepAlive, boolean keepDS,
+ boolean skipAwait) {
+ delegate.close(reason, systemFailureCause, keepAlive, keepDS, skipAwait);
}
@Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
index 930a21c..10ed80f 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
@@ -1253,7 +1253,7 @@ public class CacheCreation implements InternalCache {
@Override
public void close(final String reason, final Throwable systemFailureCause,
- final boolean keepAlive, final boolean keepDS) {
+ final boolean keepAlive, final boolean keepDS, boolean skipAwait) {
throw new UnsupportedOperationException("Should not be invoked");
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplCloseTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplCloseTest.java
index 6ad1d79..abfa4d6 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplCloseTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplCloseTest.java
@@ -14,6 +14,9 @@
*/
package org.apache.geode.internal.cache;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.mockito.ArgumentMatchers.any;
@@ -23,6 +26,9 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Properties;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -34,6 +40,8 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
import org.apache.geode.CancelCriterion;
import org.apache.geode.cache.client.PoolFactory;
@@ -68,6 +76,8 @@ public class GemFireCacheImplCloseTest {
@Rule
public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+ @Rule
+ public MockitoRule mockitoRule = MockitoJUnit.rule();
@Before
public void setUp() {
@@ -93,8 +103,6 @@ public class GemFireCacheImplCloseTest {
.thenReturn(21);
when(replyProcessor21Factory.create(any(), any()))
.thenReturn(replyProcessor21);
-
- gemFireCacheImpl = gemFireCacheImpl(false);
}
@After
@@ -109,12 +117,16 @@ public class GemFireCacheImplCloseTest {
@Test
public void isClosed_returnsFalse_ifCacheExists() {
+ gemFireCacheImpl = gemFireCacheImpl(false);
+
assertThat(gemFireCacheImpl.isClosed())
.isFalse();
}
@Test
public void isClosed_returnsTrue_ifCacheIsClosed() {
+ gemFireCacheImpl = gemFireCacheImpl(false);
+
gemFireCacheImpl.close();
assertThat(gemFireCacheImpl.isClosed())
@@ -123,6 +135,7 @@ public class GemFireCacheImplCloseTest {
@Test
public void close_closesHeapEvictor() {
+ gemFireCacheImpl = gemFireCacheImpl(false);
HeapEvictor heapEvictor = mock(HeapEvictor.class);
gemFireCacheImpl.setHeapEvictor(heapEvictor);
@@ -134,6 +147,7 @@ public class GemFireCacheImplCloseTest {
@Test
public void close_closesOffHeapEvictor() {
+ gemFireCacheImpl = gemFireCacheImpl(false);
OffHeapEvictor offHeapEvictor = mock(OffHeapEvictor.class);
gemFireCacheImpl.setOffHeapEvictor(offHeapEvictor);
@@ -150,6 +164,7 @@ public class GemFireCacheImplCloseTest {
meterRegistry.add(userRegistry);
when(internalDistributedSystem.getMeterRegistry())
.thenReturn(meterRegistry);
+ gemFireCacheImpl = gemFireCacheImpl(false);
gemFireCacheImpl.close();
@@ -163,6 +178,7 @@ public class GemFireCacheImplCloseTest {
*/
@Test
public void close_doesNothingIfAlreadyClosed() {
+ gemFireCacheImpl = gemFireCacheImpl(false);
gemFireCacheImpl.close();
verify(internalDistributedSystem).disconnect();
@@ -173,6 +189,42 @@ public class GemFireCacheImplCloseTest {
verify(internalDistributedSystem).disconnect();
}
+ @Test
+ public void close_blocksUntilFirstCallToCloseCompletes() throws Exception {
+ gemFireCacheImpl = gemFireCacheImpl(false);
+ CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
+ AtomicLong winner = new AtomicLong();
+
+ Future<Long> close1 = executorServiceRule.submit(() -> {
+ synchronized (GemFireCacheImpl.class) {
+ long threadId = Thread.currentThread().getId();
+ cyclicBarrier.await(getTimeout().toMillis(), MILLISECONDS);
+ gemFireCacheImpl.close();
+ winner.compareAndSet(0, threadId);
+ return threadId;
+ }
+ });
+
+ await().until(() -> cyclicBarrier.getNumberWaiting() == 1);
+
+ Future<Long> close2 = executorServiceRule.submit(() -> {
+ long threadId = Thread.currentThread().getId();
+ cyclicBarrier.await(getTimeout().toMillis(), MILLISECONDS);
+ gemFireCacheImpl.close();
+ winner.compareAndSet(0, threadId);
+ return threadId;
+ });
+
+ cyclicBarrier.await(getTimeout().toMillis(), MILLISECONDS);
+
+ long threadId1 = close1.get();
+ long threadId2 = close2.get();
+
+ assertThat(winner.get())
+ .as("ThreadId1=" + threadId1 + " and threadId2=" + threadId2)
+ .isEqualTo(threadId1);
+ }
+
@SuppressWarnings({"SameParameterValue", "LambdaParameterHidesMemberVariable",
"OverlyCoupledMethod", "unchecked"})
private GemFireCacheImpl gemFireCacheImpl(boolean useAsyncEventListeners) {
diff --git a/geode-memcached/src/integrationTest/java/org/apache/geode/memcached/IntegrationJUnitTest.java b/geode-memcached/src/integrationTest/java/org/apache/geode/memcached/IntegrationJUnitTest.java
index abcd797..aa96e12 100644
--- a/geode-memcached/src/integrationTest/java/org/apache/geode/memcached/IntegrationJUnitTest.java
+++ b/geode-memcached/src/integrationTest/java/org/apache/geode/memcached/IntegrationJUnitTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Properties;
@@ -33,6 +32,7 @@ import org.junit.Test;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.inet.LocalHostUtil;
public class IntegrationJUnitTest {
@@ -46,7 +46,7 @@ public class IntegrationJUnitTest {
Cache cache = cf.create();
MemcachedClient client = new MemcachedClient(new ConnectionWithOneMinuteTimeoutFactory(),
- Collections.singletonList(new InetSocketAddress(InetAddress.getLocalHost(), port)));
+ Collections.singletonList(new InetSocketAddress(LocalHostUtil.getLocalHost(), port)));
Future<Boolean> f = client.add("key", 10, "myStringValue");
assertTrue(f.get());
Future<Boolean> f1 = client.add("key1", 10, "myStringValue1");