You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2020/07/02 18:10:53 UTC
[geode] 17/29: Integrate ModuleService into geode-cq.
This is an automated email from the ASF dual-hosted git repository.
udo pushed a commit to branch feature/GEODE-8294
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 97814d86059e5ecdf1bd3dde661f850c98c2027a
Author: Patrick Johnson <pj...@pivotal.io>
AuthorDate: Wed Jun 24 13:24:41 2020 -0700
Integrate ModuleService into geode-cq.
---
.../cache/query/internal/cq/CqServiceProvider.java | 45 +-
.../geode/internal/cache/GemFireCacheImpl.java | 2645 ++++++++++----------
.../internal/cache/GemFireCacheImplCloseTest.java | 2 +-
.../geode/internal/cache/GemFireCacheImplTest.java | 2 +-
4 files changed, 1300 insertions(+), 1394 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
index e61c27b..03ee7a4 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
@@ -16,20 +16,22 @@ package org.apache.geode.cache.query.internal.cq;
import java.io.DataInput;
import java.io.IOException;
-import java.util.Iterator;
-import java.util.ServiceLoader;
+import java.util.Set;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.query.internal.cq.spi.CqServiceFactory;
import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.services.module.ModuleService;
+import org.apache.geode.services.result.ModuleServiceResult;
import org.apache.geode.util.internal.GeodeGlossary;
public class CqServiceProvider {
- @Immutable
- private static final CqServiceFactory factory;
-
+ /**
+ * A debug flag used for testing vMotion during CQ registration
+ */
+ public static final boolean VMOTION_DURING_CQ_REGISTRATION_FLAG = false;
/**
* System property to maintain the CQ event references for optimizing the updates. This will allow
* running the CQ query only once during update events.
@@ -37,24 +39,29 @@ public class CqServiceProvider {
@MutableForTesting
public static boolean MAINTAIN_KEYS = Boolean
.parseBoolean(System.getProperty(GeodeGlossary.GEMFIRE_PREFIX + "cq.MAINTAIN_KEYS", "true"));
+ @Immutable
+ private static CqServiceFactory factory;
- /**
- * A debug flag used for testing vMotion during CQ registration
- */
- public static final boolean VMOTION_DURING_CQ_REGISTRATION_FLAG = false;
+ private CqServiceProvider() {}
- static {
- ServiceLoader<CqServiceFactory> loader = ServiceLoader.load(CqServiceFactory.class);
- Iterator<CqServiceFactory> itr = loader.iterator();
- if (!itr.hasNext()) {
- factory = null;
- } else {
- factory = itr.next();
- factory.initialize();
+ private static void setup(ModuleService moduleService) {
+ if (factory == null) {
+ ModuleServiceResult<Set<CqServiceFactory>> loadServiceResult =
+ moduleService.loadService(CqServiceFactory.class);
+ if (loadServiceResult.isSuccessful()) {
+ for (CqServiceFactory cqServiceFactory : loadServiceResult.getMessage()) {
+ factory = cqServiceFactory;
+ factory.initialize();
+ break;
+ }
+ } else {
+ factory = null;
+ }
}
}
- public static CqService create(InternalCache cache) {
+ public static synchronized CqService create(InternalCache cache, ModuleService moduleService) {
+ setup(moduleService);
if (factory == null) {
return new MissingCqService();
}
@@ -69,6 +76,4 @@ public class CqServiceProvider {
return factory.readCqQuery(in);
}
}
-
- private CqServiceProvider() {}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 38f4b0e..b13c8bc 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -283,43 +283,35 @@ import org.apache.geode.services.result.ModuleServiceResult;
*/
public class GemFireCacheImpl implements InternalCache, InternalClientCache, HasCachePerfStats,
DistributionAdvisee {
- private static final Logger logger = LogService.getLogger();
-
/**
* The default number of seconds to wait for a distributed lock
*/
public static final int DEFAULT_LOCK_TIMEOUT =
Integer.getInteger(GEMFIRE_PREFIX + "Cache.defaultLockTimeout", 60);
-
/**
* The default duration (in seconds) of a lease on a distributed lock
*/
public static final int DEFAULT_LOCK_LEASE =
Integer.getInteger(GEMFIRE_PREFIX + "Cache.defaultLockLease", 120);
-
/**
* The default "copy on read" attribute value
*/
public static final boolean DEFAULT_COPY_ON_READ = false;
-
/**
* The default amount of time to wait for a {@code netSearch} to complete
*/
public static final int DEFAULT_SEARCH_TIMEOUT =
Integer.getInteger(GEMFIRE_PREFIX + "Cache.defaultSearchTimeout", 300);
-
/**
* Name of the default pool.
*/
public static final String DEFAULT_POOL_NAME = "DEFAULT";
-
@VisibleForTesting
static final int EVENT_THREAD_LIMIT =
Integer.getInteger(GEMFIRE_PREFIX + "Cache.EVENT_THREAD_LIMIT", 16);
-
@VisibleForTesting
static final int PURGE_INTERVAL = 1000;
-
+ private static final Logger logger = LogService.getLogger();
/**
* The number of threads that the QueryMonitor will use to mark queries as cancelled (see
* QueryMonitor class for reasons why a query might be cancelled). That processing is very
@@ -352,7 +344,12 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
private static final ThreadLocal<GemFireCacheImpl> xmlCache = new ThreadLocal<>();
private static final ThreadLocal<Thread> CLOSING_THREAD = new ThreadLocal<>();
-
+ /**
+ * The {@code CacheLifecycleListener} s that have been registered in this VM
+ */
+ @MakeNotStatic
+ private static final Set<CacheLifecycleListener> cacheLifecycleListeners =
+ new CopyOnWriteArraySet<>();
/**
* System property to limit the max query-execution time. By default its turned off (-1), the time
* is set in milliseconds.
@@ -360,14 +357,12 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
@MutableForTesting
public static int MAX_QUERY_EXECUTION_TIME =
Integer.getInteger(GEMFIRE_PREFIX + "Cache.MAX_QUERY_EXECUTION_TIME", -1);
-
/**
* Used by unit tests to force cache creation to use a test generated cache.xml
*/
@MutableForTesting
@VisibleForTesting
public static File testCacheXml;
-
/**
* If true then when a delta is applied the size of the entry value will be recalculated. If false
* (the default) then the size of the entry value is unchanged by a delta application. Not a final
@@ -378,14 +373,6 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
@MutableForTesting
static boolean DELTAS_RECALCULATE_SIZE =
Boolean.getBoolean(GEMFIRE_PREFIX + "DELTAS_RECALCULATE_SIZE");
-
- /**
- * The {@code CacheLifecycleListener} s that have been registered in this VM
- */
- @MakeNotStatic
- private static final Set<CacheLifecycleListener> cacheLifecycleListeners =
- new CopyOnWriteArraySet<>();
-
/**
* Property set to true if resource manager heap percentage is set and query monitor is required
*/
@@ -398,78 +385,69 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
@MakeNotStatic
private static String defaultDiskStoreName = DiskStoreFactory.DEFAULT_DISK_STORE_NAME;
+ static {
+ // this works around jdk bug 6427854
+ String propertyName = "sun.nio.ch.bugLevel";
+ String value = System.getProperty(propertyName);
+ if (value == null) {
+ System.setProperty(propertyName, "");
+ }
+ }
+
/**
* System property to disable query monitor even if resource manager is in use
*/
private final boolean queryMonitorDisabledForLowMem =
Boolean.getBoolean(GEMFIRE_PREFIX + "Cache.DISABLE_QUERY_MONITOR_FOR_LOW_MEMORY");
-
private final InternalDistributedSystem system;
-
private final DistributionManager dm;
-
private final ConcurrentMap<String, InternalRegion> rootRegions;
-
/**
* True if this cache is being created by a ClientCacheFactory.
*/
private final boolean isClient;
-
private final PoolFactory poolFactory;
-
private final ConcurrentMap<String, InternalRegion> pathToRegion = new ConcurrentHashMap<>();
-
private final CachePerfStats cachePerfStats;
-
/**
* Date on which this instances was created
*/
private final Date creationDate;
-
/**
* Thread pool for event dispatching
*/
private final ExecutorService eventThreadPool;
-
/**
* List of all cache servers. CopyOnWriteArrayList is used to allow concurrent add, remove and
* retrieval operations. It is assumed that the traversal operations on cache servers list vastly
* outnumber the mutative operations such as add, remove.
*/
private final List<InternalCacheServer> allCacheServers = new CopyOnWriteArrayList<>();
-
/**
* Unmodifiable view of "allCacheServers".
*/
private final List<CacheServer> unmodifiableAllCacheServers = unmodifiableList(allCacheServers);
-
/**
* Controls updates to the list of all gateway senders {@link #allGatewaySenders}.
*/
private final Object allGatewaySendersLock = new Object();
-
/**
* List of all async event queues added to the cache. CopyOnWriteArrayList is used to allow
* concurrent add, remove and retrieval operations.
*/
private final Set<AsyncEventQueue> allVisibleAsyncEventQueues = new CopyOnWriteArraySet<>();
-
/**
* List of all async event queues added to the cache. CopyOnWriteArrayList is used to allow
* concurrent add, remove and retrieval operations.
*/
private final Set<AsyncEventQueue> allAsyncEventQueues = new CopyOnWriteArraySet<>();
-
private final AtomicReference<GatewayReceiver> gatewayReceiver = new AtomicReference<>();
-
private final AtomicReference<InternalCacheServer> gatewayReceiverServer =
new AtomicReference<>();
-
/**
* PartitionedRegion instances (for required-events notification
*/
private final Set<PartitionedRegion> partitionedRegions = new HashSet<>();
-
/**
* Map of regions that are in the process of being destroyed. We could potentially leave the
* regions in the pathToRegion map, but that would entail too many changes at this point in the
@@ -479,101 +457,69 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
*/
private final ConcurrentMap<String, DistributedRegion> regionsInDestroy =
new ConcurrentHashMap<>();
-
private final Object allGatewayHubsLock = new Object();
-
/**
* Transaction manager for this cache.
*/
private final TXManagerImpl transactionManager;
-
/**
* Named region attributes registered with this cache.
*/
private final Map<String, RegionAttributes<?, ?>> namedRegionAttributes =
synchronizedMap(new HashMap<>());
-
/**
* System timer task for cleaning up old bridge thread event entries.
*/
private final EventTrackerExpiryTask recordedEventSweeper;
-
private final TombstoneService tombstoneService;
-
/**
* Synchronization mutex for prLockService.
*/
private final Object prLockServiceLock = new Object();
-
/**
* Synchronization mutex for gatewayLockService.
*/
private final Object gatewayLockServiceLock = new Object();
-
private final InternalResourceManager resourceManager;
-
private final BackupService backupService;
-
private final Object heapEvictorLock = new Object();
-
private final Object offHeapEvictorLock = new Object();
-
private final Object queryMonitorLock = new Object();
-
private final PersistentMemberManager persistentMemberManager;
-
private final ClientMetadataService clientMetadataService;
-
private final AtomicBoolean isShutDownAll = new AtomicBoolean();
private final CountDownLatch shutDownAllFinished = new CountDownLatch(1);
-
private final ResourceAdvisor resourceAdvisor;
private final JmxManagerAdvisor jmxAdvisor;
-
private final int serialNumber;
-
private final TXEntryStateFactory txEntryStateFactory;
-
private final CacheConfig cacheConfig;
-
private final DiskStoreMonitor diskMonitor;
-
/**
* Stores the properties used to initialize declarables.
*/
private final Map<Declarable, Properties> declarablePropertiesMap = new ConcurrentHashMap<>();
-
/**
* Resolves ${} type property strings.
*/
private final PropertyResolver resolver;
-
/**
* @since GemFire 8.1
*/
private final ExtensionPoint<Cache> extensionPoint = new SimpleExtensionPoint<>(this, this);
-
private final CqService cqService;
-
private final Set<RegionListener> regionListeners = ConcurrentHashMap.newKeySet();
-
private final Map<Class<? extends CacheService>, CacheService> services = new HashMap<>();
-
private final SecurityService securityService;
-
private final Set<RegionEntrySynchronizationListener> synchronizationListeners =
ConcurrentHashMap.newKeySet();
-
private final ClusterConfigurationLoader ccLoader = new ClusterConfigurationLoader();
-
private final StatisticsClock statisticsClock;
-
/**
* Map of futures used to track regions that are being reinitialized.
*/
private final ConcurrentMap<String, FutureResult<InternalRegion>> reinitializingRegions =
new ConcurrentHashMap<>();
-
private final HeapEvictorFactory heapEvictorFactory;
private final Runnable typeRegistryClose;
private final Function<InternalCache, String> typeRegistryGetPdxDiskStoreName;
@@ -582,328 +528,158 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
private final Consumer<org.apache.geode.cache.execute.Function> functionServiceRegisterFunction;
private final Function<Object, SystemTimer> systemTimerFactory;
private final ReplyProcessor21Factory replyProcessor21Factory;
-
private final Stopper stopper = new Stopper();
-
private final boolean disableDisconnectDsOnCacheClose =
Boolean.getBoolean(GEMFIRE_PREFIX + "DISABLE_DISCONNECT_DS_ON_CACHE_CLOSE");
-
private final ConcurrentMap<String, DiskStoreImpl> diskStores = new ConcurrentHashMap<>();
-
private final ConcurrentMap<String, DiskStoreImpl> regionOwnedDiskStores =
new ConcurrentHashMap<>();
-
/**
* Synchronization mutex for {@link #ccpTimer}.
*/
private final Object ccpTimerMutex = new Object();
-
private final ExpirationScheduler expirationScheduler;
-
/**
* TODO make this a simple int guarded by riWaiters and get rid of the double-check
*/
private final AtomicInteger registerInterestsInProgress = new AtomicInteger();
-
private final List<SimpleWaiter> riWaiters = new ArrayList<>();
-
private final InternalCacheForClientAccess cacheForClients =
new InternalCacheForClientAccess(this);
-
+ private final CountDownLatch isClosedLatch = new CountDownLatch(1);
+ private final ModuleService modulesService;
private volatile ConfigurationResponse configurationResponse;
-
private volatile boolean isInitialized;
-
private volatile boolean isClosing;
-
- private final CountDownLatch isClosedLatch = new CountDownLatch(1);
-
/**
* Set of all gateway senders. It may be fetched safely (for enumeration), but updates must by
* synchronized via {@link #allGatewaySendersLock}
*/
private volatile Set<GatewaySender> allGatewaySenders = emptySet();
-
/**
* Copy on Read feature for all read operations.
*/
private volatile boolean copyOnRead = DEFAULT_COPY_ON_READ;
-
/**
* Reason this cache was forced to close due to a forced-disconnect or system failure.
*/
private volatile Throwable disconnectCause;
-
/**
* DistributedLockService for GatewaySenders. Remains null until the first GatewaySender is
* created. Destroyed by GemFireCache when closing the cache. Guarded by gatewayLockServiceLock.
*/
private volatile DistributedLockService gatewayLockService;
-
private volatile QueryMonitor queryMonitor;
-
/**
* Not final to allow cache.xml parsing to set it.
*/
private Pool defaultPool;
-
/**
* Amount of time (in seconds) to wait for a distributed lock
*/
private int lockTimeout = DEFAULT_LOCK_TIMEOUT;
-
/**
* Amount of time a lease of a distributed lock lasts
*/
private int lockLease = DEFAULT_LOCK_LEASE;
-
/**
* Amount of time to wait for a {@code netSearch} to complete
*/
private int searchTimeout = DEFAULT_SEARCH_TIMEOUT;
-
/**
* Conflict resolver for WAN, if any. Guarded by {@link #allGatewayHubsLock}.
*/
private GatewayConflictResolver gatewayConflictResolver;
-
/**
* True if this is a server cache.
*/
private boolean isServer;
-
private RestAgent restAgent;
-
private boolean isRESTServiceRunning;
-
/**
* True if this cache was forced to close due to a forced-disconnect.
*/
private boolean forcedDisconnect;
-
/**
* Context where this cache was created for debugging.
*/
private Exception creationStack;
-
/**
* DistributedLockService for PartitionedRegions. Remains null until the first PartitionedRegion
* is created. Destroyed by GemFireCache when closing the cache. Protected by synchronization on
* prLockService. Guarded by prLockServiceLock.
*/
private DistributedLockService prLockService;
-
private HeapEvictor heapEvictor;
-
private OffHeapEvictor offHeapEvictor;
-
private ResourceEventsListener resourceEventsListener;
-
/**
* Set to true during a cache close if user requested durable subscriptions to be kept.
*
* @since GemFire 5.7
*/
private boolean keepAlive;
-
/**
* Timer for {@link CacheClientProxy}. Guarded by {@link #ccpTimerMutex}.
*/
private SystemTimer ccpTimer;
-
private int cancelCount;
-
/**
* Some tests pass value in via constructor. Product code sets value after construction.
*/
private TypeRegistry pdxRegistry;
-
private Declarable initializer;
-
private Properties initializerProps;
-
private List<File> backupFiles = emptyList();
-
private ConcurrentMap<String, CountDownLatch> diskStoreLatches = new ConcurrentHashMap();
- private final ModuleService modulesService;
-
- static {
- // this works around jdk bug 6427854
- String propertyName = "sun.nio.ch.bugLevel";
- String value = System.getProperty(propertyName);
- if (value == null) {
- System.setProperty(propertyName, "");
- }
- }
-
- /**
- * Invokes mlockall(). Locks all pages mapped into the address space of the calling process. This
- * includes the pages of the code, data and stack segment, as well as shared libraries, user space
- * kernel data, shared memory, and memory-mapped files. All mapped pages are guaranteed to be
- * resident in RAM when the call returns successfully; the pages are guaranteed to stay in RAM
- * until later unlocked.
- *
- * @param flags MCL_CURRENT 1 - Lock all pages which are currently mapped into the address space
- * of the process.
- *
- * MCL_FUTURE 2 - Lock all pages which will become mapped into the address space of the
- * process in the future. These could be for instance new pages required by a growing heap
- * and stack as well as new memory mapped files or shared memory regions.
- *
- * @return 0 if success, non-zero if error and errno set
- */
- private static native int mlockall(int flags);
-
- public static void lockMemory() {
- try {
- Native.register(Platform.C_LIBRARY_NAME);
- int result = mlockall(1);
- if (result == 0) {
- return;
- }
- } catch (Throwable t) {
- throw new IllegalStateException("Error trying to lock memory", t);
- }
-
- int lastError = Native.getLastError();
- String message = "mlockall failed: " + lastError;
- if (lastError == 1 || lastError == 12) {
- // if EPERM || ENOMEM
- message = "Unable to lock memory due to insufficient free space or privileges. "
- + "Please check the RLIMIT_MEMLOCK soft resource limit (ulimit -l) and "
- + "increase the available memory if needed";
- }
- throw new IllegalStateException(message);
- }
-
/**
- * Returns the last created instance of GemFireCache.
+ * Creates a new instance of GemFireCache and populates it according to the {@code cache.xml}, if
+ * appropriate.
*
- * @deprecated use DM.getCache instead
+ * <p>
+ * Currently only unit tests set the typeRegistry parameter to a non-null value
*/
- @Deprecated
- public static GemFireCacheImpl getInstance() {
- InternalDistributedSystem system = getAnyInstance();
- if (system == null) {
- return null;
- }
-
- GemFireCacheImpl cache = (GemFireCacheImpl) system.getCache();
- if (cache == null) {
- return null;
- }
-
- if (cache.isClosing) {
- return null;
- }
-
- return cache;
- }
-
- /**
- * Returns an existing instance. If a cache does not exist throws a cache closed exception.
- *
- * @return the existing cache
- * @throws CacheClosedException if an existing cache can not be found.
- * @deprecated use DM.getExistingCache instead.
- */
- @Deprecated
- public static GemFireCacheImpl getExisting() {
- GemFireCacheImpl result = getInstance();
- if (result != null && !result.isClosing) {
- return result;
- }
- if (result != null) {
- throw result.getCacheClosedException("The cache has been closed.");
- }
- throw new CacheClosedException("A cache has not yet been created.");
- }
-
- /**
- * Returns an existing instance. If a cache does not exist throws an exception.
- *
- * @param reason the reason an existing cache is being requested.
- * @return the existing cache
- * @throws CacheClosedException if an existing cache can not be found.
- * @deprecated Please use {@link DistributionManager#getExistingCache()} instead.
- */
- @Deprecated
- public static GemFireCacheImpl getExisting(String reason) {
- GemFireCacheImpl result = getInstance();
- if (result == null) {
- throw new CacheClosedException(reason);
- }
- return result;
- }
-
- /**
- * Returns instance to pdx even while it is being closed.
- *
- * @deprecated Please use a cache that is passed to your method instead.
- */
- @Deprecated
- public static GemFireCacheImpl getForPdx(String reason) {
- InternalDistributedSystem system = getAnyInstance();
- if (system == null) {
- throw new CacheClosedException(reason);
- }
-
- GemFireCacheImpl cache = (GemFireCacheImpl) system.getCache();
- if (cache == null) {
- throw new CacheClosedException(reason);
- }
-
- return cache;
- }
-
- /**
- * Creates a new instance of GemFireCache and populates it according to the {@code cache.xml}, if
- * appropriate.
- *
- * <p>
- * Currently only unit tests set the typeRegistry parameter to a non-null value
- */
- GemFireCacheImpl(boolean isClient, PoolFactory poolFactory,
- InternalDistributedSystem internalDistributedSystem, CacheConfig cacheConfig,
- boolean useAsyncEventListeners, TypeRegistry typeRegistry) {
- this(isClient,
- poolFactory,
- internalDistributedSystem,
- cacheConfig,
- useAsyncEventListeners,
- typeRegistry,
- JNDIInvoker::mapTransactions,
- SecurityServiceFactory::create,
- () -> PoolManager.getAll().isEmpty(),
- ManagementListener::new,
- CqServiceProvider::create,
- CachePerfStats::new,
- TXManagerImpl::new,
- PersistentMemberManager::new,
- ResourceAdvisor::createResourceAdvisor,
- JmxManagerAdvisee::new,
- JmxManagerAdvisor::createJmxManagerAdvisor,
- InternalResourceManager::createResourceManager,
- DistributionAdvisor::createSerialNumber,
- HeapEvictor::new,
- TypeRegistry::init,
- TypeRegistry::open,
- TypeRegistry::close,
- TypeRegistry::getPdxDiskStoreName,
- TypeRegistry::setPdxSerializer,
- TypeRegistry::new,
- HARegionQueue::setMessageSyncInterval,
- FunctionService::registerFunction,
- object -> new SystemTimer((DistributedSystem) object),
- TombstoneService::initialize,
- ExpirationScheduler::new,
- DiskStoreMonitor::new,
- GatewaySenderQueueEntrySynchronizationListener::new,
- BackupService::new,
- ClientMetadataService::new,
- TXEntryState.getFactory(),
- ReplyProcessor21::new);
+ GemFireCacheImpl(boolean isClient, PoolFactory poolFactory,
+ InternalDistributedSystem internalDistributedSystem, CacheConfig cacheConfig,
+ boolean useAsyncEventListeners, TypeRegistry typeRegistry) {
+ this(isClient,
+ poolFactory,
+ internalDistributedSystem,
+ cacheConfig,
+ useAsyncEventListeners,
+ typeRegistry,
+ JNDIInvoker::mapTransactions,
+ SecurityServiceFactory::create,
+ () -> PoolManager.getAll().isEmpty(),
+ ManagementListener::new,
+ CqServiceProvider::create,
+ CachePerfStats::new,
+ TXManagerImpl::new,
+ PersistentMemberManager::new,
+ ResourceAdvisor::createResourceAdvisor,
+ JmxManagerAdvisee::new,
+ JmxManagerAdvisor::createJmxManagerAdvisor,
+ InternalResourceManager::createResourceManager,
+ DistributionAdvisor::createSerialNumber,
+ HeapEvictor::new,
+ TypeRegistry::init,
+ TypeRegistry::open,
+ TypeRegistry::close,
+ TypeRegistry::getPdxDiskStoreName,
+ TypeRegistry::setPdxSerializer,
+ TypeRegistry::new,
+ HARegionQueue::setMessageSyncInterval,
+ FunctionService::registerFunction,
+ object -> new SystemTimer((DistributedSystem) object),
+ TombstoneService::initialize,
+ ExpirationScheduler::new,
+ DiskStoreMonitor::new,
+ GatewaySenderQueueEntrySynchronizationListener::new,
+ BackupService::new,
+ ClientMetadataService::new,
+ TXEntryState.getFactory(),
+ ReplyProcessor21::new);
}
@VisibleForTesting
@@ -917,7 +693,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
InternalSecurityServiceFactory securityServiceFactory,
Supplier<Boolean> isPoolManagerEmpty,
Function<InternalDistributedSystem, ManagementListener> managementListenerFactory,
- Function<InternalCache, CqService> cqServiceFactory,
+ InternalCqServiceFactory cqServiceFactory,
CachePerfStatsFactory cachePerfStatsFactory,
TXManagerImplFactory txManagerImplFactory,
Supplier<PersistentMemberManager> persistentMemberManagerFactory,
@@ -1007,7 +783,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
rootRegions = new ConcurrentHashMap<>();
- cqService = cqServiceFactory.apply(this);
+ cqService = cqServiceFactory.create(this, modulesService);
// Create the CacheStatistics
statisticsClock = StatisticsClockFactory.clock(system.getConfig().getEnableTimeStatistics());
@@ -1094,235 +870,124 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
clientMetadataService = clientMetadataServiceFactory.apply(this);
}
- @Override
- public void lockDiskStore(String diskStoreName) {
- doLockDiskStore(diskStoreName);
- }
-
/**
- * If the disk store is not associated with a {@code CountDownLatch},
- * constructs a {@code CountDownLatch} with count one, and associate it with the disk store;
- * otherwise wait until the associated {@code CountDownLatch} has counted down to zero.
+ * Invokes mlockall(). Locks all pages mapped into the address space of the calling process. This
+ * includes the pages of the code, data and stack segment, as well as shared libraries, user space
+ * kernel data, shared memory, and memory-mapped files. All mapped pages are guaranteed to be
+ * resident in RAM when the call returns successfully; the pages are guaranteed to stay in RAM
+ * until later unlocked.
*
- * @param diskStoreName the name of the disk store
- * @return {@code true} if it does not call {@code CountDownLatch.await()};
- * {@code false} otherwise.
+ * @param flags MCL_CURRENT 1 - Lock all pages which are currently mapped into the address space
+ * of the process.
+ *
+ * MCL_FUTURE 2 - Lock all pages which will become mapped into the address space of the
+ * process in the future. These could be for instance new pages required by a growing heap
+ * and stack as well as new memory mapped files or shared memory regions.
+ *
+ * @return 0 if success, non-zero if error and errno set
*/
- @VisibleForTesting
- boolean doLockDiskStore(String diskStoreName) {
- CountDownLatch countDownLatch =
- diskStoreLatches.putIfAbsent(diskStoreName, new CountDownLatch(1));
- if (countDownLatch != null) {
- try {
- countDownLatch.await();
- return false;
- } catch (InterruptedException e) {
- throw new InternalGemFireError(e);
+ private static native int mlockall(int flags);
+
+ public static void lockMemory() {
+ try {
+ Native.register(Platform.C_LIBRARY_NAME);
+ int result = mlockall(1);
+ if (result == 0) {
+ return;
}
+ } catch (Throwable t) {
+ throw new IllegalStateException("Error trying to lock memory", t);
}
- return true;
- }
- @Override
- public void unlockDiskStore(String diskStoreName) {
- doUnlockDiskStore(diskStoreName);
+ int lastError = Native.getLastError();
+ String message = "mlockall failed: " + lastError;
+ if (lastError == 1 || lastError == 12) {
+ // if EPERM || ENOMEM
+ message = "Unable to lock memory due to insufficient free space or privileges. "
+ + "Please check the RLIMIT_MEMLOCK soft resource limit (ulimit -l) and "
+ + "increase the available memory if needed";
+ }
+ throw new IllegalStateException(message);
}
/**
- * Decrements the count of the {@code CountDownLatch} associated with the disk store.
+ * Returns the last created instance of GemFireCache.
*
- * @param diskStoreName the name of the disk store
- * @return {@code true} if the disk store associated {@code CountDownLatch} has decremented;
- * {@code false} otherwise.
+ * @deprecated use DM.getCache instead
*/
- @VisibleForTesting
- boolean doUnlockDiskStore(String diskStoreName) {
- CountDownLatch countDownLatch = diskStoreLatches.get(diskStoreName);
- if (countDownLatch != null) {
- countDownLatch.countDown();
- return true;
+ @Deprecated
+ public static GemFireCacheImpl getInstance() {
+ InternalDistributedSystem system = getAnyInstance();
+ if (system == null) {
+ return null;
}
- return false;
+
+ GemFireCacheImpl cache = (GemFireCacheImpl) system.getCache();
+ if (cache == null) {
+ return null;
+ }
+
+ if (cache.isClosing) {
+ return null;
+ }
+
+ return cache;
}
/**
- * This is for debugging cache-open issues such as {@link CacheExistsException}.
+ * Returns an existing instance. If a cache does not exist throws a cache closed exception.
+ *
+ * @return the existing cache
+ * @throws CacheClosedException if an existing cache can not be found.
+ * @deprecated use DM.getExistingCache instead.
*/
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("GemFireCache[");
- sb.append("id = ").append(System.identityHashCode(this));
- sb.append("; isClosing = ").append(isClosing);
- sb.append("; isShutDownAll = ").append(isCacheAtShutdownAll());
- sb.append("; created = ").append(creationDate);
- sb.append("; server = ").append(isServer);
- sb.append("; copyOnRead = ").append(copyOnRead);
- sb.append("; lockLease = ").append(lockLease);
- sb.append("; lockTimeout = ").append(lockTimeout);
- if (creationStack != null) {
- sb.append(lineSeparator()).append("Creation context:").append(lineSeparator());
- sb.append(getStackTrace(creationStack));
+ @Deprecated
+ public static GemFireCacheImpl getExisting() {
+ GemFireCacheImpl result = getInstance();
+ if (result != null && !result.isClosing) {
+ return result;
}
- sb.append("]");
- return sb.toString();
+ if (result != null) {
+ throw result.getCacheClosedException("The cache has been closed.");
+ }
+ throw new CacheClosedException("A cache has not yet been created.");
}
- @Override
- public void throwCacheExistsException() {
- throw new CacheExistsException(this, String.format("%s: An open cache already exists.", this),
- creationStack);
+ /**
+ * Returns an existing instance. If a cache does not exist throws an exception.
+ *
+ * @param reason the reason an existing cache is being requested.
+ * @return the existing cache
+ * @throws CacheClosedException if an existing cache can not be found.
+ * @deprecated Please use {@link DistributionManager#getExistingCache()} instead.
+ */
+ @Deprecated
+ public static GemFireCacheImpl getExisting(String reason) {
+ GemFireCacheImpl result = getInstance();
+ if (result == null) {
+ throw new CacheClosedException(reason);
+ }
+ return result;
}
- @Override
- public MeterRegistry getMeterRegistry() {
- return system.getMeterRegistry();
- }
+ /**
+ * Returns instance to pdx even while it is being closed.
+ *
+ * @deprecated Please use a cache that is passed to your method instead.
+ */
+ @Deprecated
+ public static GemFireCacheImpl getForPdx(String reason) {
+ InternalDistributedSystem system = getAnyInstance();
+ if (system == null) {
+ throw new CacheClosedException(reason);
+ }
- @Override
- public void saveCacheXmlForReconnect() {
- // there are two versions of this method so it can be unit-tested
- boolean sharedConfigEnabled = getDistributionManager().getConfig().getUseSharedConfiguration();
+ GemFireCacheImpl cache = (GemFireCacheImpl) system.getCache();
+ if (cache == null) {
+ throw new CacheClosedException(reason);
+ }
- if (!Boolean.getBoolean(GEMFIRE_PREFIX + "autoReconnect-useCacheXMLFile")
- && !sharedConfigEnabled) {
- try {
- logger.info("generating XML to rebuild the cache after reconnect completes");
- StringPrintWriter pw = new StringPrintWriter();
- CacheXmlGenerator.generate((Cache) this, pw, false);
- String cacheXML = pw.toString();
- getCacheConfig().setCacheXMLDescription(cacheXML);
- logger.info("XML generation completed: {}", cacheXML);
- } catch (CancelException e) {
- logger.info("Unable to generate XML description for reconnect of cache due to exception",
- e);
- }
- } else if (sharedConfigEnabled && !getCacheServers().isEmpty()) {
- // we need to retain a cache-server description if this JVM was started by gfsh
- List<CacheServerCreation> list = new ArrayList<>(getCacheServers().size());
- for (Object o : getCacheServers()) {
- CacheServerImpl cs = (CacheServerImpl) o;
- if (cs.isDefaultServer()) {
- CacheServerCreation bsc = new CacheServerCreation(this, cs);
- list.add(bsc);
- }
- }
- getCacheConfig().setCacheServerCreation(list);
- logger.info("CacheServer configuration saved");
- }
- }
-
- @Override
- public void reLoadClusterConfiguration() throws IOException, ClassNotFoundException {
- configurationResponse = requestSharedConfiguration();
- if (configurationResponse != null) {
- ccLoader.deployJarsReceivedFromClusterConfiguration(configurationResponse);
- ccLoader.applyClusterPropertiesConfiguration(configurationResponse, system.getConfig());
- ccLoader.applyClusterXmlConfiguration(this, configurationResponse,
- system.getConfig().getGroups());
- initializeDeclarativeCache();
- }
- }
-
- /**
- * Initialize the EventTracker's timer task for tracking and shutdown purposes.
- */
- private EventTrackerExpiryTask createEventTrackerExpiryTask() {
- long lifetimeInMillis = Long.getLong(GEMFIRE_PREFIX + "messageTrackingTimeout",
- PoolFactory.DEFAULT_SUBSCRIPTION_MESSAGE_TRACKING_TIMEOUT / 3);
- EventTrackerExpiryTask task = new EventTrackerExpiryTask(lifetimeInMillis);
- getCCPTimer().scheduleAtFixedRate(task, lifetimeInMillis, lifetimeInMillis);
- return task;
- }
-
- @Override
- public SecurityService getSecurityService() {
- return securityService;
- }
-
- @Override
- public boolean isRESTServiceRunning() {
- return isRESTServiceRunning;
- }
-
- @Override
- public void setRESTServiceRunning(boolean isRESTServiceRunning) {
- this.isRESTServiceRunning = isRESTServiceRunning;
- }
-
- @Override
- @VisibleForTesting
- public RestAgent getRestAgent() {
- return restAgent;
- }
-
- /**
- * Request the cluster configuration from the locator(s) if cluster config service is running.
- */
- @VisibleForTesting
- ConfigurationResponse requestSharedConfiguration() {
- DistributionConfig config = system.getConfig();
-
- if (!(dm instanceof ClusterDistributionManager)) {
- return null;
- }
-
- // do nothing if this vm is/has locator or this is a client
- if (dm.getDMType() == LOCATOR_DM_TYPE || isClient || Locator.getLocator() != null) {
- return null;
- }
-
- // can't simply return null if server is not using shared configuration, since we need to find
- // out if the locator is running in secure mode or not, if yes, then we need to throw an
- // exception if server is not using cluster config.
-
- Map<InternalDistributedMember, Collection<String>> locatorsWithClusterConfig =
- getDistributionManager().getAllHostedLocatorsWithSharedConfiguration();
-
- // If there are no locators with Shared configuration, that means the system has been started
- // without shared configuration then do not make requests to the locators.
- if (locatorsWithClusterConfig.isEmpty()) {
- logger.info("No locator(s) found with cluster configuration service");
- return null;
- }
-
- try {
- ConfigurationResponse response = ccLoader.requestConfigurationFromLocators(
- system.getConfig().getGroups(), locatorsWithClusterConfig.keySet());
-
- // log the configuration received from the locator
- logger.info("Received cluster configuration from the locator");
- logger.info(response.describeConfig());
-
- Configuration clusterConfig = response.getRequestedConfiguration().get(CLUSTER_CONFIG);
- Properties clusterSecProperties =
- clusterConfig == null ? new Properties() : clusterConfig.getGemfireProperties();
-
- // If not using shared configuration, return null or throw an exception is locator is secured
- if (!config.getUseSharedConfiguration()) {
- if (clusterSecProperties.containsKey(SECURITY_MANAGER)) {
- throw new GemFireConfigException(
- "A server must use cluster configuration when joining a secured cluster.");
- }
- logger.info(
- "The cache has been created with use-cluster-configuration=false. It will not receive any cluster configuration");
- return null;
- }
-
- Properties serverSecProperties = config.getSecurityProps();
- // check for possible mis-configuration
- if (isMisConfigured(clusterSecProperties, serverSecProperties, SECURITY_MANAGER)
- || isMisConfigured(clusterSecProperties, serverSecProperties, SECURITY_POST_PROCESSOR)) {
- throw new GemFireConfigException(
- "A server cannot specify its own security-manager or security-post-processor when using cluster configuration");
- }
- return response;
-
- } catch (ClusterConfigurationNotAvailableException e) {
- throw new GemFireConfigException("cluster configuration service not available", e);
- } catch (UnknownHostException e) {
- throw new GemFireConfigException(e.getLocalizedMessage(), e);
- }
+ return cache;
}
/**
@@ -1351,16 +1016,6 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
return !clusterPropValue.equals(serverPropValue);
}
- @Override
- public boolean isClient() {
- return isClient;
- }
-
- @Override
- public boolean hasPool() {
- return isClient || !getAllPools().isEmpty();
- }
-
private static Collection<Pool> getAllPools() {
Collection<Pool> pools = PoolManagerImpl.getPMI().getMap().values();
for (Iterator<Pool> itr = pools.iterator(); itr.hasNext();) {
@@ -1372,294 +1027,982 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
return pools;
}
- @Override
- public synchronized Pool getDefaultPool() {
- if (defaultPool == null) {
- determineDefaultPool();
- }
- return defaultPool;
- }
+ private static void logCacheXML(URL url, String cacheXmlDescription) {
+ if (cacheXmlDescription == null) {
+ StringBuilder sb = new StringBuilder();
+ BufferedReader br = null;
+ try {
+ br = new BufferedReader(new InputStreamReader(url.openStream()));
+ String line = br.readLine();
+ while (line != null) {
+ if (!line.isEmpty()) {
+ sb.append(lineSeparator()).append(line);
+ }
+ line = br.readLine();
+ }
+ } catch (IOException ignore) {
+ } finally {
+ closeQuietly(br);
+ }
+ logger.info("Initializing cache using {}:{}", url, sb);
- @Override
- public void initialize() {
- for (CacheLifecycleListener listener : cacheLifecycleListeners) {
- listener.cacheCreated(this);
+ } else {
+ logger.info("Initializing cache using {}:{}", "generated description from old cache",
+ cacheXmlDescription);
}
+ }
- if (isClient()) {
- initializeClientRegionShortcuts(this);
- } else {
- initializeRegionShortcuts(this);
+ /**
+ * Close the distributed system, cache servers, and gateways. Clears the rootRegions and
+ * partitionedRegions map. Marks the cache as closed.
+ *
+ * @see SystemFailure#emergencyClose()
+ */
+ public static void emergencyClose() {
+ GemFireCacheImpl cache = getInstance();
+ if (cache == null) {
+ return;
}
- // set ClassPathLoader and then deploy cluster config jars
- ClassPathLoader.setLatestToDefault(system.getConfig().getDeployWorkingDir());
+ // leave the PdxSerializer set if we have one
- try {
- ccLoader.deployJarsReceivedFromClusterConfiguration(configurationResponse);
- } catch (IOException | ClassNotFoundException e) {
- throw new GemFireConfigException(
- "Exception while deploying the jars received as a part of cluster Configuration",
- e);
+ // Shut down messaging first
+ InternalDistributedSystem ids = cache.system;
+ if (ids != null) {
+ ids.emergencyClose();
}
- SystemMemberCacheEventProcessor.send(this, Operation.CACHE_CREATE);
- resourceAdvisor.initializationGate();
+ cache.disconnectCause = SystemFailure.getFailure();
+ cache.isClosing = true;
- // Register function that we need to execute to fetch available REST service endpoints in DS
- functionServiceRegisterFunction.accept(new FindRestEnabledServersFunction());
+ for (InternalCacheServer cacheServer : cache.allCacheServers) {
+ Acceptor acceptor = cacheServer.getAcceptor();
+ if (acceptor != null) {
+ acceptor.emergencyClose();
+ }
+ }
- // Entry to GemFire Management service
- jmxAdvisor.initializationGate();
+ closeGateWayReceiverServers(cache);
- initializeServices();
+ PoolManagerImpl.emergencyClose();
- // This starts up the ManagementService, registers and federates the internal beans. Since it
- // may be starting up web services, it relies on the prior step which would have started the
- // HttpService.
- system.handleResourceEvent(ResourceEvent.CACHE_CREATE, this);
+ // rootRegions is intentionally *not* synchronized. The
+ // implementation of clear() does not currently allocate objects.
+ cache.rootRegions.clear();
- // Resource events, generated for started services. These events may depend on the prior
- // CACHE_CREATE event which is why they are split out separately.
- handleResourceEventsForCacheServices();
+ // partitionedRegions is intentionally *not* synchronized, The
+ // implementation of clear() does not currently allocate objects.
+ cache.partitionedRegions.clear();
+ }
- boolean completedCacheXml = false;
- try {
- if (!isClient) {
- applyJarAndXmlFromClusterConfig();
- }
- initializeDeclarativeCache();
- completedCacheXml = true;
- } catch (RuntimeException e) {
- logger.error("Cache initialization for " + this.toString() + " failed because:", e);
- throw e;
- } finally {
- if (!completedCacheXml) {
- // so initializeDeclarativeCache threw an exception
- try {
- close();
- } catch (Throwable ignore) {
- // I don't want init to throw an exception that came from the close.
- // I want it to throw the original exception that came from initializeDeclarativeCache.
- }
- configurationResponse = null;
+ private static void closeGateWayReceiverServers(GemFireCacheImpl cache) {
+ InternalCacheServer receiverServer = cache.gatewayReceiverServer.get();
+ if (receiverServer != null) {
+ Acceptor acceptor = receiverServer.getAcceptor();
+ if (acceptor != null) {
+ acceptor.emergencyClose();
}
}
+ }
- boolean completedHandleResourceEvent = false;
- try {
- system.handleResourceEvent(ResourceEvent.CLUSTER_CONFIGURATION_APPLIED, this);
- completedHandleResourceEvent = true;
- } finally {
- if (!completedHandleResourceEvent) {
- try {
- close();
- } catch (Throwable ignore) {
- // We don't want to throw an exception that came from the close
- // so throw original exception from handleResourceEvent.
- }
+ private static Map<InternalDistributedMember, PersistentMemberID> getSubMapForLiveMembers(
+ Set<InternalDistributedMember> membersToPersistOfflineEqual,
+ Map<InternalDistributedMember, PersistentMemberID> bucketMap) {
+ if (bucketMap == null) {
+ return null;
+ }
+ Map<InternalDistributedMember, PersistentMemberID> persistMap = new HashMap<>();
+ for (InternalDistributedMember member : membersToPersistOfflineEqual) {
+ if (bucketMap.containsKey(member)) {
+ persistMap.put(member, bucketMap.get(member));
}
}
+ return persistMap;
+ }
- startColocatedJmxManagerLocator();
-
- startRestAgentServer(this);
-
- isInitialized = true;
+ public static String getDefaultDiskStoreName() {
+ return defaultDiskStoreName;
}
+ /**
+ * Used by unit tests to allow them to change the default disk store name.
+ */
@VisibleForTesting
- void applyJarAndXmlFromClusterConfig() {
- if (configurationResponse == null) {
- // Deploy all the jars from the deploy working dir.
- ClassPathLoader.getLatest().getJarDeployer().loadPreviouslyDeployedJarsFromDisk();
- }
- ccLoader.applyClusterXmlConfiguration(this, configurationResponse,
- system.getConfig().getGroups());
+ public static void setDefaultDiskStoreName(String dsName) {
+ defaultDiskStoreName = dsName;
}
/**
- * Initialize any services provided as extensions to the cache using service loader.
+ * @throws IllegalArgumentException if path is not valid
*/
- private void initializeServices() {
- ModuleServiceResult<Set<CacheService>> loadedServices =
- modulesService.loadService(CacheService.class);
- if (loadedServices.isSuccessful()) {
- // ServiceLoader<CacheService> loader = ServiceLoader.load(CacheService.class);
- for (CacheService service : loadedServices.getMessage()) {
- try {
- if (service.init(this)) {
- this.services.put(service.getInterface(), service);
- logger.info("Initialized cache service {}", service.getClass().getName());
- }
- } catch (Exception ex) {
- logger.warn("Cache service " + service.getClass().getName() + " failed to initialize",
- ex);
- }
- }
- } else {
- logger.warn(loadedServices.getErrorMessage());
+ private static void validatePath(String path) {
+ if (path == null) {
+ throw new IllegalArgumentException("path cannot be null");
}
- }
-
- private void handleResourceEventsForCacheServices() {
- for (CacheService service : services.values()) {
- system.handleResourceEvent(ResourceEvent.CACHE_SERVICE_CREATE, service);
+ if (path.isEmpty()) {
+ throw new IllegalArgumentException("path cannot be empty");
}
- }
-
- private boolean isServerNode() {
- return system.getDistributedMember().getVmKind() != LOCATOR_DM_TYPE
- && system.getDistributedMember().getVmKind() != ADMIN_ONLY_DM_TYPE
- && !isClient();
- }
-
- private void startRestAgentServer(InternalCache cache) {
- if (system.getConfig().getStartDevRestApi() && isServerNode()) {
- restAgent = new RestAgent(system.getConfig(), securityService);
- restAgent.start(cache);
- } else {
- restAgent = null;
+ if (path.equals(SEPARATOR)) {
+ throw new IllegalArgumentException(String.format("path cannot be ' %s '", SEPARATOR));
}
}
- @Override
- public URL getCacheXmlURL() {
- if (getMyId().getVmKind() == LOCATOR_DM_TYPE) {
- return null;
- }
-
- File xmlFile = testCacheXml;
- if (xmlFile == null) {
- xmlFile = system.getConfig().getCacheXmlFile();
- }
- if (xmlFile.getName().isEmpty()) {
- return null;
+ /**
+ * @return array of two Strings, the root name and the relative path from root. If there is no
+ * relative path from root, then String[1] will be an empty string
+ */
+ static String[] parsePath(String path) {
+ validatePath(path);
+ String[] result = new String[2];
+ result[1] = "";
+ // strip off root name from path
+ int slashIndex = path.indexOf(SEPARATOR_CHAR);
+ if (slashIndex == 0) {
+ path = path.substring(1);
+ slashIndex = path.indexOf(SEPARATOR_CHAR);
}
-
- URL url;
- if (!xmlFile.exists() || !xmlFile.isFile()) {
- // do a resource search
- String resource = xmlFile.getPath();
- resource = DOUBLE_BACKSLASH.matcher(resource).replaceAll("/");
- if (resource.length() > 1 && resource.startsWith("/")) {
- resource = resource.substring(1);
- }
- url = ClassPathLoader.getLatest().getResource(getClass(), resource);
- } else {
- try {
- url = xmlFile.toURL();
- } catch (MalformedURLException ex) {
- throw new CacheXmlException(
- String.format("Could not convert XML file %s to an URL.", xmlFile), ex);
- }
+ result[0] = path;
+ if (slashIndex > 0) {
+ result[0] = path.substring(0, slashIndex);
+ result[1] = path.substring(slashIndex + 1);
}
+ return result;
+ }
- if (url == null) {
- File defaultFile = DistributionConfig.DEFAULT_CACHE_XML_FILE;
- if (!xmlFile.equals(defaultFile)) {
- if (!xmlFile.exists()) {
- throw new CacheXmlException(
- String.format("Declarative Cache XML file/resource %s does not exist.", xmlFile));
- }
- throw new CacheXmlException(
- String.format("Declarative XML file %s is not a file.", xmlFile));
- }
+ /**
+ * Add the {@code CacheLifecycleListener}.
+ */
+ public static void addCacheLifecycleListener(CacheLifecycleListener listener) {
+ synchronized (GemFireCacheImpl.class) {
+ cacheLifecycleListeners.add(listener);
}
-
- return url;
}
/**
- * Initialize the contents of this {@code Cache} according to the declarative caching XML file
- * specified by the given {@code DistributedSystem}. Note that this operation cannot be performed
- * in the constructor because creating regions in the cache, etc. uses the cache itself (which
- * isn't initialized until the constructor returns).
+ * Remove the {@code CacheLifecycleListener}.
*
- * @throws CacheXmlException If something goes wrong while parsing the declarative caching XML
- * file.
- * @throws TimeoutException If a {@link Region#put(Object, Object)}times out while initializing
- * the cache.
- * @throws CacheWriterException If a {@code CacheWriterException} is thrown while initializing the
- * cache.
- * @throws RegionExistsException If the declarative caching XML file describes a region that
- * already exists (including the root region).
- * @throws GatewayException If a {@code GatewayException} is thrown while initializing the cache.
- * @see #loadCacheXml
+ * @return true if the listener was removed
*/
- private void initializeDeclarativeCache()
- throws TimeoutException, CacheWriterException, GatewayException, RegionExistsException {
- URL url = getCacheXmlURL();
- String cacheXmlDescription = cacheConfig.getCacheXMLDescription();
- if (url == null && cacheXmlDescription == null) {
- initializePdxRegistry();
- readyDynamicRegionFactory();
- // nothing needs to be done
- return;
+ public static boolean removeCacheLifecycleListener(CacheLifecycleListener listener) {
+ synchronized (GemFireCacheImpl.class) {
+ return cacheLifecycleListeners.remove(listener);
}
+ }
- InputStream stream = null;
+ private static void closeQuietly(Closeable closeable) {
try {
- logCacheXML(url, cacheXmlDescription);
- if (cacheXmlDescription != null) {
- if (logger.isTraceEnabled()) {
- logger.trace("initializing cache with generated XML: {}", cacheXmlDescription);
- }
- stream = new StringBufferInputStream(cacheXmlDescription);
- } else {
- stream = url.openStream();
+ if (closeable != null) {
+ closeable.close();
}
- loadCacheXml(stream);
-
- } catch (IOException ex) {
- throw new CacheXmlException(String.format(
- "While opening Cache XML %s the following error occurred %s", url.toString(), ex));
-
- } catch (CacheXmlException ex) {
- throw new CacheXmlException(
- String.format("While reading Cache XML %s. %s", url, ex.getMessage()), ex.getCause());
-
- } finally {
- closeQuietly(stream);
+ } catch (IOException ignore) {
}
}
- private static void logCacheXML(URL url, String cacheXmlDescription) {
- if (cacheXmlDescription == null) {
- StringBuilder sb = new StringBuilder();
- BufferedReader br = null;
- try {
- br = new BufferedReader(new InputStreamReader(url.openStream()));
- String line = br.readLine();
- while (line != null) {
- if (!line.isEmpty()) {
- sb.append(lineSeparator()).append(line);
- }
- line = br.readLine();
- }
- } catch (IOException ignore) {
- } finally {
- closeQuietly(br);
- }
- logger.info("Initializing cache using {}:{}", url, sb);
-
- } else {
- logger.info("Initializing cache using {}:{}", "generated description from old cache",
- cacheXmlDescription);
+ private static RegionService createAuthenticatedCacheView(Pool pool, Properties properties) {
+ if (pool.getMultiuserAuthentication()) {
+ return ((PoolImpl) pool).createAuthenticatedCacheView(properties);
}
+ throw new IllegalStateException(
+ "The pool " + pool.getName() + " did not have multiuser-authentication set to true");
}
- @Override
- public synchronized void initializePdxRegistry() {
- if (pdxRegistry == null) {
- // The member with locator is initialized with a NullTypePdxRegistration
- if (getMyId().getVmKind() == LOCATOR_DM_TYPE) {
- pdxRegistry = typeRegistryFactory.create(this, true);
- } else {
- pdxRegistry = typeRegistryFactory.create(this, false);
- }
- pdxRegistry.initialize();
- }
+ public static void initializeRegionShortcuts(Cache cache) {
+ for (RegionShortcut shortcut : RegionShortcut.values()) {
+ switch (shortcut) {
+ case PARTITION: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.PARTITION);
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ af.setPartitionAttributes(paf.create());
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case PARTITION_REDUNDANT: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.PARTITION);
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(1);
+ af.setPartitionAttributes(paf.create());
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case PARTITION_PERSISTENT: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ af.setPartitionAttributes(paf.create());
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case PARTITION_REDUNDANT_PERSISTENT: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(1);
+ af.setPartitionAttributes(paf.create());
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case PARTITION_OVERFLOW: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.PARTITION);
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ af.setPartitionAttributes(paf.create());
+ af.setEvictionAttributes(
+ EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case PARTITION_REDUNDANT_OVERFLOW: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.PARTITION);
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(1);
+ af.setPartitionAttributes(paf.create());
+ af.setEvictionAttributes(
+ EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case PARTITION_PERSISTENT_OVERFLOW: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ af.setPartitionAttributes(paf.create());
+ af.setEvictionAttributes(
+ EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case PARTITION_REDUNDANT_PERSISTENT_OVERFLOW: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(1);
+ af.setPartitionAttributes(paf.create());
+ af.setEvictionAttributes(
+ EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case PARTITION_HEAP_LRU: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.PARTITION);
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ af.setPartitionAttributes(paf.create());
+ af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes());
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case PARTITION_REDUNDANT_HEAP_LRU: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.PARTITION);
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(1);
+ af.setPartitionAttributes(paf.create());
+ af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes());
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case REPLICATE: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.REPLICATE);
+ af.setScope(Scope.DISTRIBUTED_ACK);
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case REPLICATE_PERSISTENT: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
+ af.setScope(Scope.DISTRIBUTED_ACK);
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case REPLICATE_OVERFLOW: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.REPLICATE);
+ af.setScope(Scope.DISTRIBUTED_ACK);
+ af.setEvictionAttributes(
+ EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case REPLICATE_PERSISTENT_OVERFLOW: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
+ af.setScope(Scope.DISTRIBUTED_ACK);
+ af.setEvictionAttributes(
+ EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case REPLICATE_HEAP_LRU: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.REPLICATE);
+ af.setScope(Scope.DISTRIBUTED_ACK);
+ af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes());
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case LOCAL: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.NORMAL);
+ af.setScope(Scope.LOCAL);
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case LOCAL_PERSISTENT: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
+ af.setScope(Scope.LOCAL);
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case LOCAL_HEAP_LRU: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.NORMAL);
+ af.setScope(Scope.LOCAL);
+ af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes());
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case LOCAL_OVERFLOW: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.NORMAL);
+ af.setScope(Scope.LOCAL);
+ af.setEvictionAttributes(
+ EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case LOCAL_PERSISTENT_OVERFLOW: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
+ af.setScope(Scope.LOCAL);
+ af.setEvictionAttributes(
+ EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case PARTITION_PROXY: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.PARTITION);
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setLocalMaxMemory(0);
+ af.setPartitionAttributes(paf.create());
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case PARTITION_PROXY_REDUNDANT: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.PARTITION);
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setLocalMaxMemory(0);
+ paf.setRedundantCopies(1);
+ af.setPartitionAttributes(paf.create());
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case REPLICATE_PROXY: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.EMPTY);
+ af.setScope(Scope.DISTRIBUTED_ACK);
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ default:
+ throw new IllegalStateException("unhandled enum " + shortcut);
+ }
+ }
+ }
+
+ public static void initializeClientRegionShortcuts(Cache cache) {
+ for (ClientRegionShortcut shortcut : ClientRegionShortcut.values()) {
+ switch (shortcut) {
+ case LOCAL: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.NORMAL);
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case LOCAL_PERSISTENT: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case LOCAL_HEAP_LRU: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.NORMAL);
+ af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes());
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case LOCAL_OVERFLOW: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.NORMAL);
+ af.setEvictionAttributes(
+ EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case LOCAL_PERSISTENT_OVERFLOW: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
+ af.setEvictionAttributes(
+ EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+ cache.setRegionAttributes(shortcut.toString(), af.create());
+ break;
+ }
+ case PROXY: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.EMPTY);
+ UserSpecifiedRegionAttributes<?, ?> attributes =
+ (UserSpecifiedRegionAttributes) af.create();
+ attributes.requiresPoolName = true;
+ cache.setRegionAttributes(shortcut.toString(), attributes);
+ break;
+ }
+ case CACHING_PROXY: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.NORMAL);
+ UserSpecifiedRegionAttributes<?, ?> attributes =
+ (UserSpecifiedRegionAttributes) af.create();
+ attributes.requiresPoolName = true;
+ cache.setRegionAttributes(shortcut.toString(), attributes);
+ break;
+ }
+ case CACHING_PROXY_HEAP_LRU: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.NORMAL);
+ af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes());
+ UserSpecifiedRegionAttributes<?, ?> attributes =
+ (UserSpecifiedRegionAttributes) af.create();
+ attributes.requiresPoolName = true;
+ cache.setRegionAttributes(shortcut.toString(), attributes);
+ break;
+ }
+ case CACHING_PROXY_OVERFLOW: {
+ AttributesFactory<?, ?> af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.NORMAL);
+ af.setEvictionAttributes(
+ EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+ UserSpecifiedRegionAttributes<?, ?> attributes =
+ (UserSpecifiedRegionAttributes) af.create();
+ attributes.requiresPoolName = true;
+ cache.setRegionAttributes(shortcut.toString(), attributes);
+ break;
+ }
+ default:
+ throw new IllegalStateException("unhandled enum " + shortcut);
+ }
+ }
+ }
+
+ @Override
+ public void lockDiskStore(String diskStoreName) {
+ doLockDiskStore(diskStoreName);
+ }
+
+ /**
+ * If the disk store is not associated with a {@code CountDownLatch},
+ * constructs a {@code CountDownLatch} with count one, and associate it with the disk store;
+ * otherwise wait until the associated {@code CountDownLatch} has counted down to zero.
+ *
+ * @param diskStoreName the name of the disk store
+ * @return {@code true} if it does not call {@code CountDownLatch.await()};
+ * {@code false} otherwise.
+ */
+ @VisibleForTesting
+ boolean doLockDiskStore(String diskStoreName) {
+ CountDownLatch countDownLatch =
+ diskStoreLatches.putIfAbsent(diskStoreName, new CountDownLatch(1));
+ if (countDownLatch != null) {
+ try {
+ countDownLatch.await();
+ return false;
+ } catch (InterruptedException e) {
+ throw new InternalGemFireError(e);
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void unlockDiskStore(String diskStoreName) {
+ doUnlockDiskStore(diskStoreName);
+ }
+
+ /**
+ * Decrements the count of the {@code CountDownLatch} associated with the disk store.
+ *
+ * @param diskStoreName the name of the disk store
+ * @return {@code true} if the disk store associated {@code CountDownLatch} has decremented;
+ * {@code false} otherwise.
+ */
+ @VisibleForTesting
+ boolean doUnlockDiskStore(String diskStoreName) {
+ CountDownLatch countDownLatch = diskStoreLatches.get(diskStoreName);
+ if (countDownLatch != null) {
+ countDownLatch.countDown();
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * This is for debugging cache-open issues such as {@link CacheExistsException}.
+ */
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("GemFireCache[");
+ sb.append("id = ").append(System.identityHashCode(this));
+ sb.append("; isClosing = ").append(isClosing);
+ sb.append("; isShutDownAll = ").append(isCacheAtShutdownAll());
+ sb.append("; created = ").append(creationDate);
+ sb.append("; server = ").append(isServer);
+ sb.append("; copyOnRead = ").append(copyOnRead);
+ sb.append("; lockLease = ").append(lockLease);
+ sb.append("; lockTimeout = ").append(lockTimeout);
+ if (creationStack != null) {
+ sb.append(lineSeparator()).append("Creation context:").append(lineSeparator());
+ sb.append(getStackTrace(creationStack));
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ @Override
+ public void throwCacheExistsException() {
+ throw new CacheExistsException(this, String.format("%s: An open cache already exists.", this),
+ creationStack);
+ }
+
+ @Override
+ public MeterRegistry getMeterRegistry() {
+ return system.getMeterRegistry();
+ }
+
+ @Override
+ public void saveCacheXmlForReconnect() {
+ // there are two versions of this method so it can be unit-tested
+ boolean sharedConfigEnabled = getDistributionManager().getConfig().getUseSharedConfiguration();
+
+ if (!Boolean.getBoolean(GEMFIRE_PREFIX + "autoReconnect-useCacheXMLFile")
+ && !sharedConfigEnabled) {
+ try {
+ logger.info("generating XML to rebuild the cache after reconnect completes");
+ StringPrintWriter pw = new StringPrintWriter();
+ CacheXmlGenerator.generate((Cache) this, pw, false);
+ String cacheXML = pw.toString();
+ getCacheConfig().setCacheXMLDescription(cacheXML);
+ logger.info("XML generation completed: {}", cacheXML);
+ } catch (CancelException e) {
+ logger.info("Unable to generate XML description for reconnect of cache due to exception",
+ e);
+ }
+ } else if (sharedConfigEnabled && !getCacheServers().isEmpty()) {
+ // we need to retain a cache-server description if this JVM was started by gfsh
+ List<CacheServerCreation> list = new ArrayList<>(getCacheServers().size());
+ for (Object o : getCacheServers()) {
+ CacheServerImpl cs = (CacheServerImpl) o;
+ if (cs.isDefaultServer()) {
+ CacheServerCreation bsc = new CacheServerCreation(this, cs);
+ list.add(bsc);
+ }
+ }
+ getCacheConfig().setCacheServerCreation(list);
+ logger.info("CacheServer configuration saved");
+ }
+ }
+
+ @Override
+ public void reLoadClusterConfiguration() throws IOException, ClassNotFoundException {
+ configurationResponse = requestSharedConfiguration();
+ if (configurationResponse != null) {
+ ccLoader.deployJarsReceivedFromClusterConfiguration(configurationResponse);
+ ccLoader.applyClusterPropertiesConfiguration(configurationResponse, system.getConfig());
+ ccLoader.applyClusterXmlConfiguration(this, configurationResponse,
+ system.getConfig().getGroups());
+ initializeDeclarativeCache();
+ }
+ }
+
+ /**
+ * Initialize the EventTracker's timer task for tracking and shutdown purposes.
+ */
+ private EventTrackerExpiryTask createEventTrackerExpiryTask() {
+ long lifetimeInMillis = Long.getLong(GEMFIRE_PREFIX + "messageTrackingTimeout",
+ PoolFactory.DEFAULT_SUBSCRIPTION_MESSAGE_TRACKING_TIMEOUT / 3);
+ EventTrackerExpiryTask task = new EventTrackerExpiryTask(lifetimeInMillis);
+ getCCPTimer().scheduleAtFixedRate(task, lifetimeInMillis, lifetimeInMillis);
+ return task;
+ }
+
+ @Override
+ public SecurityService getSecurityService() {
+ return securityService;
+ }
+
+ @Override
+ public boolean isRESTServiceRunning() {
+ return isRESTServiceRunning;
+ }
+
+ @Override
+ public void setRESTServiceRunning(boolean isRESTServiceRunning) {
+ this.isRESTServiceRunning = isRESTServiceRunning;
+ }
+
+ @Override
+ @VisibleForTesting
+ public RestAgent getRestAgent() {
+ return restAgent;
+ }
+
+ /**
+ * Request the cluster configuration from the locator(s) if cluster config service is running.
+ */
+ @VisibleForTesting
+ ConfigurationResponse requestSharedConfiguration() {
+ DistributionConfig config = system.getConfig();
+
+ if (!(dm instanceof ClusterDistributionManager)) {
+ return null;
+ }
+
+ // do nothing if this vm is/has locator or this is a client
+ if (dm.getDMType() == LOCATOR_DM_TYPE || isClient || Locator.getLocator() != null) {
+ return null;
+ }
+
+ // can't simply return null if server is not using shared configuration, since we need to find
+ // out if the locator is running in secure mode or not, if yes, then we need to throw an
+ // exception if server is not using cluster config.
+
+ Map<InternalDistributedMember, Collection<String>> locatorsWithClusterConfig =
+ getDistributionManager().getAllHostedLocatorsWithSharedConfiguration();
+
+ // If there are no locators with Shared configuration, that means the system has been started
+ // without shared configuration then do not make requests to the locators.
+ if (locatorsWithClusterConfig.isEmpty()) {
+ logger.info("No locator(s) found with cluster configuration service");
+ return null;
+ }
+
+ try {
+ ConfigurationResponse response = ccLoader.requestConfigurationFromLocators(
+ system.getConfig().getGroups(), locatorsWithClusterConfig.keySet());
+
+ // log the configuration received from the locator
+ logger.info("Received cluster configuration from the locator");
+ logger.info(response.describeConfig());
+
+ Configuration clusterConfig = response.getRequestedConfiguration().get(CLUSTER_CONFIG);
+ Properties clusterSecProperties =
+ clusterConfig == null ? new Properties() : clusterConfig.getGemfireProperties();
+
+ // If not using shared configuration, return null or throw an exception is locator is secured
+ if (!config.getUseSharedConfiguration()) {
+ if (clusterSecProperties.containsKey(SECURITY_MANAGER)) {
+ throw new GemFireConfigException(
+ "A server must use cluster configuration when joining a secured cluster.");
+ }
+ logger.info(
+ "The cache has been created with use-cluster-configuration=false. It will not receive any cluster configuration");
+ return null;
+ }
+
+ Properties serverSecProperties = config.getSecurityProps();
+ // check for possible mis-configuration
+ if (isMisConfigured(clusterSecProperties, serverSecProperties, SECURITY_MANAGER)
+ || isMisConfigured(clusterSecProperties, serverSecProperties, SECURITY_POST_PROCESSOR)) {
+ throw new GemFireConfigException(
+ "A server cannot specify its own security-manager or security-post-processor when using cluster configuration");
+ }
+ return response;
+
+ } catch (ClusterConfigurationNotAvailableException e) {
+ throw new GemFireConfigException("cluster configuration service not available", e);
+ } catch (UnknownHostException e) {
+ throw new GemFireConfigException(e.getLocalizedMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean isClient() {
+ return isClient;
+ }
+
+ @Override
+ public boolean hasPool() {
+ return isClient || !getAllPools().isEmpty();
+ }
+
+ @Override
+ public synchronized Pool getDefaultPool() {
+ if (defaultPool == null) {
+ determineDefaultPool();
+ }
+ return defaultPool;
+ }
+
+ @Override
+ public void initialize() {
+ for (CacheLifecycleListener listener : cacheLifecycleListeners) {
+ listener.cacheCreated(this);
+ }
+
+ if (isClient()) {
+ initializeClientRegionShortcuts(this);
+ } else {
+ initializeRegionShortcuts(this);
+ }
+
+ // set ClassPathLoader and then deploy cluster config jars
+ ClassPathLoader.setLatestToDefault(system.getConfig().getDeployWorkingDir());
+
+ try {
+ ccLoader.deployJarsReceivedFromClusterConfiguration(configurationResponse);
+ } catch (IOException | ClassNotFoundException e) {
+ throw new GemFireConfigException(
+ "Exception while deploying the jars received as a part of cluster Configuration",
+ e);
+ }
+
+ SystemMemberCacheEventProcessor.send(this, Operation.CACHE_CREATE);
+ resourceAdvisor.initializationGate();
+
+ // Register function that we need to execute to fetch available REST service endpoints in DS
+ functionServiceRegisterFunction.accept(new FindRestEnabledServersFunction());
+
+ // Entry to GemFire Management service
+ jmxAdvisor.initializationGate();
+
+ initializeServices();
+
+ // This starts up the ManagementService, registers and federates the internal beans. Since it
+ // may be starting up web services, it relies on the prior step which would have started the
+ // HttpService.
+ system.handleResourceEvent(ResourceEvent.CACHE_CREATE, this);
+
+ // Resource events, generated for started services. These events may depend on the prior
+ // CACHE_CREATE event which is why they are split out separately.
+ handleResourceEventsForCacheServices();
+
+ boolean completedCacheXml = false;
+ try {
+ if (!isClient) {
+ applyJarAndXmlFromClusterConfig();
+ }
+ initializeDeclarativeCache();
+ completedCacheXml = true;
+ } catch (RuntimeException e) {
+ logger.error("Cache initialization for " + this.toString() + " failed because:", e);
+ throw e;
+ } finally {
+ if (!completedCacheXml) {
+ // so initializeDeclarativeCache threw an exception
+ try {
+ close();
+ } catch (Throwable ignore) {
+ // I don't want init to throw an exception that came from the close.
+ // I want it to throw the original exception that came from initializeDeclarativeCache.
+ }
+ configurationResponse = null;
+ }
+ }
+
+ boolean completedHandleResourceEvent = false;
+ try {
+ system.handleResourceEvent(ResourceEvent.CLUSTER_CONFIGURATION_APPLIED, this);
+ completedHandleResourceEvent = true;
+ } finally {
+ if (!completedHandleResourceEvent) {
+ try {
+ close();
+ } catch (Throwable ignore) {
+ // We don't want to throw an exception that came from the close
+ // so throw original exception from handleResourceEvent.
+ }
+ }
+ }
+
+ startColocatedJmxManagerLocator();
+
+ startRestAgentServer(this);
+
+ isInitialized = true;
+ }
+
+ @VisibleForTesting
+ void applyJarAndXmlFromClusterConfig() {
+ if (configurationResponse == null) {
+ // Deploy all the jars from the deploy working dir.
+ ClassPathLoader.getLatest().getJarDeployer().loadPreviouslyDeployedJarsFromDisk();
+ }
+ ccLoader.applyClusterXmlConfiguration(this, configurationResponse,
+ system.getConfig().getGroups());
+ }
+
+ /**
+ * Initialize any services provided as extensions to the cache using service loader.
+ */
+ private void initializeServices() {
+ ModuleServiceResult<Set<CacheService>> loadedServices =
+ modulesService.loadService(CacheService.class);
+ if (loadedServices.isSuccessful()) {
+ // ServiceLoader<CacheService> loader = ServiceLoader.load(CacheService.class);
+ for (CacheService service : loadedServices.getMessage()) {
+ try {
+ if (service.init(this)) {
+ this.services.put(service.getInterface(), service);
+ logger.info("Initialized cache service {}", service.getClass().getName());
+ }
+ } catch (Exception ex) {
+ logger.warn("Cache service " + service.getClass().getName() + " failed to initialize",
+ ex);
+ }
+ }
+ } else {
+ logger.warn(loadedServices.getErrorMessage());
+ }
+ }
+
+ private void handleResourceEventsForCacheServices() {
+ for (CacheService service : services.values()) {
+ system.handleResourceEvent(ResourceEvent.CACHE_SERVICE_CREATE, service);
+ }
+ }
+
+ private boolean isServerNode() {
+ return system.getDistributedMember().getVmKind() != LOCATOR_DM_TYPE
+ && system.getDistributedMember().getVmKind() != ADMIN_ONLY_DM_TYPE
+ && !isClient();
+ }
+
+ private void startRestAgentServer(InternalCache cache) {
+ if (system.getConfig().getStartDevRestApi() && isServerNode()) {
+ restAgent = new RestAgent(system.getConfig(), securityService);
+ restAgent.start(cache);
+ } else {
+ restAgent = null;
+ }
+ }
+
+ @Override
+ public URL getCacheXmlURL() {
+ if (getMyId().getVmKind() == LOCATOR_DM_TYPE) {
+ return null;
+ }
+
+ File xmlFile = testCacheXml;
+ if (xmlFile == null) {
+ xmlFile = system.getConfig().getCacheXmlFile();
+ }
+ if (xmlFile.getName().isEmpty()) {
+ return null;
+ }
+
+ URL url;
+ if (!xmlFile.exists() || !xmlFile.isFile()) {
+ // do a resource search
+ String resource = xmlFile.getPath();
+ resource = DOUBLE_BACKSLASH.matcher(resource).replaceAll("/");
+ if (resource.length() > 1 && resource.startsWith("/")) {
+ resource = resource.substring(1);
+ }
+ url = ClassPathLoader.getLatest().getResource(getClass(), resource);
+ } else {
+ try {
+ url = xmlFile.toURL();
+ } catch (MalformedURLException ex) {
+ throw new CacheXmlException(
+ String.format("Could not convert XML file %s to an URL.", xmlFile), ex);
+ }
+ }
+
+ if (url == null) {
+ File defaultFile = DistributionConfig.DEFAULT_CACHE_XML_FILE;
+ if (!xmlFile.equals(defaultFile)) {
+ if (!xmlFile.exists()) {
+ throw new CacheXmlException(
+ String.format("Declarative Cache XML file/resource %s does not exist.", xmlFile));
+ }
+ throw new CacheXmlException(
+ String.format("Declarative XML file %s is not a file.", xmlFile));
+ }
+ }
+
+ return url;
+ }
+
+ /**
+ * Initialize the contents of this {@code Cache} according to the declarative caching XML file
+ * specified by the given {@code DistributedSystem}. Note that this operation cannot be performed
+ * in the constructor because creating regions in the cache, etc. uses the cache itself (which
+ * isn't initialized until the constructor returns).
+ *
+ * @throws CacheXmlException If something goes wrong while parsing the declarative caching XML
+ * file.
+ * @throws TimeoutException If a {@link Region#put(Object, Object)}times out while initializing
+ * the cache.
+ * @throws CacheWriterException If a {@code CacheWriterException} is thrown while initializing the
+ * cache.
+ * @throws RegionExistsException If the declarative caching XML file describes a region that
+ * already exists (including the root region).
+ * @throws GatewayException If a {@code GatewayException} is thrown while initializing the cache.
+ * @see #loadCacheXml
+ */
+ private void initializeDeclarativeCache()
+ throws TimeoutException, CacheWriterException, GatewayException, RegionExistsException {
+ URL url = getCacheXmlURL();
+ String cacheXmlDescription = cacheConfig.getCacheXMLDescription();
+ if (url == null && cacheXmlDescription == null) {
+ initializePdxRegistry();
+ readyDynamicRegionFactory();
+ // nothing needs to be done
+ return;
+ }
+
+ InputStream stream = null;
+ try {
+ logCacheXML(url, cacheXmlDescription);
+ if (cacheXmlDescription != null) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("initializing cache with generated XML: {}", cacheXmlDescription);
+ }
+ stream = new StringBufferInputStream(cacheXmlDescription);
+ } else {
+ stream = url.openStream();
+ }
+ loadCacheXml(stream);
+
+ } catch (IOException ex) {
+ throw new CacheXmlException(String.format(
+ "While opening Cache XML %s the following error occurred %s", url.toString(), ex));
+
+ } catch (CacheXmlException ex) {
+ throw new CacheXmlException(
+ String.format("While reading Cache XML %s. %s", url, ex.getMessage()), ex.getCause());
+
+ } finally {
+ closeQuietly(stream);
+ }
+ }
+
+ @Override
+ public synchronized void initializePdxRegistry() {
+ if (pdxRegistry == null) {
+ // The member with locator is initialized with a NullTypePdxRegistration
+ if (getMyId().getVmKind() == LOCATOR_DM_TYPE) {
+ pdxRegistry = typeRegistryFactory.create(this, true);
+ } else {
+ pdxRegistry = typeRegistryFactory.create(this, false);
+ }
+ pdxRegistry.initialize();
+ }
}
@Override
@@ -1707,70 +2050,22 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
result = new CacheClosedException(reason);
}
return result;
- }
-
- @Override
- @VisibleForTesting
- public Throwable getDisconnectCause() {
- return disconnectCause;
- }
-
- @Override
- public boolean keepDurableSubscriptionsAlive() {
- return keepAlive;
- }
-
- /**
- * Close the distributed system, cache servers, and gateways. Clears the rootRegions and
- * partitionedRegions map. Marks the cache as closed.
- *
- * @see SystemFailure#emergencyClose()
- */
- public static void emergencyClose() {
- GemFireCacheImpl cache = getInstance();
- if (cache == null) {
- return;
- }
-
- // leave the PdxSerializer set if we have one
-
- // Shut down messaging first
- InternalDistributedSystem ids = cache.system;
- if (ids != null) {
- ids.emergencyClose();
- }
-
- cache.disconnectCause = SystemFailure.getFailure();
- cache.isClosing = true;
-
- for (InternalCacheServer cacheServer : cache.allCacheServers) {
- Acceptor acceptor = cacheServer.getAcceptor();
- if (acceptor != null) {
- acceptor.emergencyClose();
- }
- }
-
- closeGateWayReceiverServers(cache);
-
- PoolManagerImpl.emergencyClose();
-
- // rootRegions is intentionally *not* synchronized. The
- // implementation of clear() does not currently allocate objects.
- cache.rootRegions.clear();
+ }
- // partitionedRegions is intentionally *not* synchronized, The
- // implementation of clear() does not currently allocate objects.
- cache.partitionedRegions.clear();
+ @Override
+ @VisibleForTesting
+ public Throwable getDisconnectCause() {
+ return disconnectCause;
}
- private static void closeGateWayReceiverServers(GemFireCacheImpl cache) {
- InternalCacheServer receiverServer = cache.gatewayReceiverServer.get();
- if (receiverServer != null) {
- Acceptor acceptor = receiverServer.getAcceptor();
- if (acceptor != null) {
- acceptor.emergencyClose();
- }
- }
+ @VisibleForTesting
+ void setDisconnectCause(Throwable disconnectCause) {
+ this.disconnectCause = disconnectCause;
+ }
+
+ @Override
+ public boolean keepDurableSubscriptionsAlive() {
+ return keepAlive;
}
@Override
@@ -1979,21 +2274,6 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
}
}
- private static Map<InternalDistributedMember, PersistentMemberID> getSubMapForLiveMembers(
- Set<InternalDistributedMember> membersToPersistOfflineEqual,
- Map<InternalDistributedMember, PersistentMemberID> bucketMap) {
- if (bucketMap == null) {
- return null;
- }
- Map<InternalDistributedMember, PersistentMemberID> persistMap = new HashMap<>();
- for (InternalDistributedMember member : membersToPersistOfflineEqual) {
- if (bucketMap.containsKey(member)) {
- persistMap.put(member, bucketMap.get(member));
- }
- }
- return persistMap;
- }
-
@Override
public void close() {
close(false);
@@ -2095,6 +2375,14 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
}
}
+ /**
+ * Used by test to inject an evictor.
+ */
+ @VisibleForTesting
+ void setHeapEvictor(HeapEvictor evictor) {
+ heapEvictor = evictor;
+ }
+
@Override
@VisibleForTesting
public OffHeapEvictor getOffHeapEvictor() {
@@ -2115,14 +2403,6 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
offHeapEvictor = evictor;
}
- /**
- * Used by test to inject an evictor.
- */
- @VisibleForTesting
- void setHeapEvictor(HeapEvictor evictor) {
- heapEvictor = evictor;
- }
-
@Override
public PersistentMemberManager getPersistentMemberManager() {
return persistentMemberManager;
@@ -2599,18 +2879,6 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
}
}
- /**
- * Used by unit tests to allow them to change the default disk store name.
- */
- @VisibleForTesting
- public static void setDefaultDiskStoreName(String dsName) {
- defaultDiskStoreName = dsName;
- }
-
- public static String getDefaultDiskStoreName() {
- return defaultDiskStoreName;
- }
-
@Override
public DiskStoreImpl getOrCreateDefaultDiskStore() {
DiskStoreImpl result = (DiskStoreImpl) findDiskStore(null);
@@ -3245,21 +3513,6 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
}
}
- /**
- * @throws IllegalArgumentException if path is not valid
- */
- private static void validatePath(String path) {
- if (path == null) {
- throw new IllegalArgumentException("path cannot be null");
- }
- if (path.isEmpty()) {
- throw new IllegalArgumentException("path cannot be empty");
- }
- if (path.equals(SEPARATOR)) {
- throw new IllegalArgumentException(String.format("path cannot be ' %s '", SEPARATOR));
- }
- }
-
@Override
public <K, V> Region<K, V> getRegionByPath(String path) {
return uncheckedCast(getInternalRegionByPath(path));
@@ -3543,13 +3796,13 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
}
@Override
- public void setCopyOnRead(boolean copyOnRead) {
- this.copyOnRead = copyOnRead;
+ public boolean getCopyOnRead() {
+ return copyOnRead;
}
@Override
- public boolean getCopyOnRead() {
- return copyOnRead;
+ public void setCopyOnRead(boolean copyOnRead) {
+ this.copyOnRead = copyOnRead;
}
@Override
@@ -3560,48 +3813,6 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
}
- /**
- * @return array of two Strings, the root name and the relative path from root. If there is no
- * relative path from root, then String[1] will be an empty string
- */
- static String[] parsePath(String path) {
- validatePath(path);
- String[] result = new String[2];
- result[1] = "";
- // strip off root name from path
- int slashIndex = path.indexOf(SEPARATOR_CHAR);
- if (slashIndex == 0) {
- path = path.substring(1);
- slashIndex = path.indexOf(SEPARATOR_CHAR);
- }
- result[0] = path;
- if (slashIndex > 0) {
- result[0] = path.substring(0, slashIndex);
- result[1] = path.substring(slashIndex + 1);
- }
- return result;
- }
-
- /**
- * Add the {@code CacheLifecycleListener}.
- */
- public static void addCacheLifecycleListener(CacheLifecycleListener listener) {
- synchronized (GemFireCacheImpl.class) {
- cacheLifecycleListeners.add(listener);
- }
- }
-
- /**
- * Remove the {@code CacheLifecycleListener}.
- *
- * @return true if the listener was removed
- */
- public static boolean removeCacheLifecycleListener(CacheLifecycleListener listener) {
- synchronized (GemFireCacheImpl.class) {
- return cacheLifecycleListeners.remove(listener);
- }
- }
-
@Override
public void addRegionListener(RegionListener regionListener) {
regionListeners.add(regionListener);
@@ -4200,15 +4411,6 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
}
}
- private static void closeQuietly(Closeable closeable) {
- try {
- if (closeable != null) {
- closeable.close();
- }
- } catch (IOException ignore) {
- }
- }
-
@Override
public void readyForEvents() {
if (isClient()) {
@@ -4246,13 +4448,13 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
}
@Override
- public void setBackupFiles(List<File> backups) {
- backupFiles = backups;
+ public List<File> getBackupFiles() {
+ return unmodifiableList(backupFiles);
}
@Override
- public List<File> getBackupFiles() {
- return unmodifiableList(backupFiles);
+ public void setBackupFiles(List<File> backups) {
+ backupFiles = backups;
}
@Override
@@ -4369,434 +4571,132 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
this,
maxTime);
if (logger.isDebugEnabled()) {
- logger.debug("QueryMonitor thread started.");
- }
- queryMonitor = tempQueryMonitor;
- }
- }
- }
- return tempQueryMonitor;
- }
-
- private void sendAddCacheServerProfileMessage() {
- Set<InternalDistributedMember> otherMembers = dm.getOtherDistributionManagerIds();
- AddCacheServerProfileMessage message = new AddCacheServerProfileMessage();
- message.operateOnLocalCache(this);
-
- if (!otherMembers.isEmpty()) {
- if (logger.isDebugEnabled()) {
- logger.debug("Sending add cache server profile message to other members.");
- }
- ReplyProcessor21 replyProcessor = new ReplyProcessor21(dm, otherMembers);
- message.setRecipients(otherMembers);
- message.processorId = replyProcessor.getProcessorId();
- dm.putOutgoing(message);
-
- // Wait for replies.
- try {
- replyProcessor.waitForReplies();
- } catch (InterruptedException ignore) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
-
- private void sendRemoveCacheServerProfileMessage() {
- Set<InternalDistributedMember> otherMembers = dm.getOtherDistributionManagerIds();
- RemoveCacheServerProfileMessage message = new RemoveCacheServerProfileMessage();
- message.operateOnLocalCache(this);
-
- // This block prevents sending a message to old members that do not know about
- // the RemoveCacheServerProfileMessage
- otherMembers.removeIf(member -> Version.GEODE_1_5_0.compareTo(member.getVersionObject()) > 0);
-
- if (!otherMembers.isEmpty()) {
- if (logger.isDebugEnabled()) {
- logger.debug("Sending remove cache server profile message to other members.");
- }
- ReplyProcessor21 replyProcessor = new ReplyProcessor21(dm, otherMembers);
- message.setRecipients(otherMembers);
- message.processorId = replyProcessor.getProcessorId();
- dm.putOutgoing(message);
-
- // Wait for replies.
- try {
- replyProcessor.waitForReplies();
- } catch (InterruptedException ignore) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- @Override
- public TXManagerImpl getTxManager() {
- return transactionManager;
- }
-
- @Override
- public <K, V> RegionFactory<K, V> createRegionFactory(RegionShortcut shortcut) {
- throwIfClient();
- return new InternalRegionFactory<>(this, shortcut);
- }
-
- @Override
- public <K, V> RegionFactory<K, V> createRegionFactory() {
- throwIfClient();
- return new InternalRegionFactory<>(this);
- }
-
- @Override
- public <K, V> RegionFactory<K, V> createRegionFactory(String regionAttributesId) {
- throwIfClient();
- return new InternalRegionFactory<>(this, regionAttributesId);
- }
-
- @Override
- public <K, V> RegionFactory<K, V> createRegionFactory(RegionAttributes<K, V> regionAttributes) {
- throwIfClient();
- return new InternalRegionFactory<>(this, regionAttributes);
- }
-
- /**
- * @since GemFire 6.5
- */
- @Override
- public <K, V> ClientRegionFactory<K, V> createClientRegionFactory(ClientRegionShortcut shortcut) {
- return new ClientRegionFactoryImpl<>(this, shortcut);
- }
-
- @Override
- public <K, V> ClientRegionFactory<K, V> createClientRegionFactory(String regionAttributesId) {
- return new ClientRegionFactoryImpl<>(this, regionAttributesId);
- }
-
- @Override
- public QueryService getQueryService(String poolName) {
- Pool pool = PoolManager.find(poolName);
- if (pool == null) {
- throw new IllegalStateException("Could not find a pool named " + poolName);
+ logger.debug("QueryMonitor thread started.");
+ }
+ queryMonitor = tempQueryMonitor;
+ }
+ }
}
- return pool.getQueryService();
+ return tempQueryMonitor;
}
- @Override
- public RegionService createAuthenticatedView(Properties userSecurityProperties) {
- Pool pool = getDefaultPool();
- if (pool == null) {
- throw new IllegalStateException("This cache does not have a default pool");
- }
- return createAuthenticatedCacheView(pool, userSecurityProperties);
- }
+ private void sendAddCacheServerProfileMessage() {
+ Set<InternalDistributedMember> otherMembers = dm.getOtherDistributionManagerIds();
+ AddCacheServerProfileMessage message = new AddCacheServerProfileMessage();
+ message.operateOnLocalCache(this);
- @Override
- public RegionService createAuthenticatedView(Properties userSecurityProperties, String poolName) {
- Pool pool = PoolManager.find(poolName);
- if (pool == null) {
- throw new IllegalStateException("Pool " + poolName + " does not exist");
- }
- return createAuthenticatedCacheView(pool, userSecurityProperties);
- }
+ if (!otherMembers.isEmpty()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Sending add cache server profile message to other members.");
+ }
+ ReplyProcessor21 replyProcessor = new ReplyProcessor21(dm, otherMembers);
+ message.setRecipients(otherMembers);
+ message.processorId = replyProcessor.getProcessorId();
+ dm.putOutgoing(message);
- private static RegionService createAuthenticatedCacheView(Pool pool, Properties properties) {
- if (pool.getMultiuserAuthentication()) {
- return ((PoolImpl) pool).createAuthenticatedCacheView(properties);
+ // Wait for replies.
+ try {
+ replyProcessor.waitForReplies();
+ } catch (InterruptedException ignore) {
+ Thread.currentThread().interrupt();
+ }
}
- throw new IllegalStateException(
- "The pool " + pool.getName() + " did not have multiuser-authentication set to true");
}
- public static void initializeRegionShortcuts(Cache cache) {
- for (RegionShortcut shortcut : RegionShortcut.values()) {
- switch (shortcut) {
- case PARTITION: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.PARTITION);
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
- af.setPartitionAttributes(paf.create());
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case PARTITION_REDUNDANT: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.PARTITION);
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
- paf.setRedundantCopies(1);
- af.setPartitionAttributes(paf.create());
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case PARTITION_PERSISTENT: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
- af.setPartitionAttributes(paf.create());
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case PARTITION_REDUNDANT_PERSISTENT: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
- paf.setRedundantCopies(1);
- af.setPartitionAttributes(paf.create());
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case PARTITION_OVERFLOW: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.PARTITION);
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
- af.setPartitionAttributes(paf.create());
- af.setEvictionAttributes(
- EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case PARTITION_REDUNDANT_OVERFLOW: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.PARTITION);
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
- paf.setRedundantCopies(1);
- af.setPartitionAttributes(paf.create());
- af.setEvictionAttributes(
- EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case PARTITION_PERSISTENT_OVERFLOW: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
- af.setPartitionAttributes(paf.create());
- af.setEvictionAttributes(
- EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case PARTITION_REDUNDANT_PERSISTENT_OVERFLOW: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
- paf.setRedundantCopies(1);
- af.setPartitionAttributes(paf.create());
- af.setEvictionAttributes(
- EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case PARTITION_HEAP_LRU: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.PARTITION);
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
- af.setPartitionAttributes(paf.create());
- af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes());
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case PARTITION_REDUNDANT_HEAP_LRU: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.PARTITION);
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
- paf.setRedundantCopies(1);
- af.setPartitionAttributes(paf.create());
- af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes());
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case REPLICATE: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.REPLICATE);
- af.setScope(Scope.DISTRIBUTED_ACK);
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case REPLICATE_PERSISTENT: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
- af.setScope(Scope.DISTRIBUTED_ACK);
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case REPLICATE_OVERFLOW: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.REPLICATE);
- af.setScope(Scope.DISTRIBUTED_ACK);
- af.setEvictionAttributes(
- EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case REPLICATE_PERSISTENT_OVERFLOW: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
- af.setScope(Scope.DISTRIBUTED_ACK);
- af.setEvictionAttributes(
- EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case REPLICATE_HEAP_LRU: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.REPLICATE);
- af.setScope(Scope.DISTRIBUTED_ACK);
- af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes());
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case LOCAL: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.NORMAL);
- af.setScope(Scope.LOCAL);
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case LOCAL_PERSISTENT: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
- af.setScope(Scope.LOCAL);
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case LOCAL_HEAP_LRU: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.NORMAL);
- af.setScope(Scope.LOCAL);
- af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes());
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case LOCAL_OVERFLOW: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.NORMAL);
- af.setScope(Scope.LOCAL);
- af.setEvictionAttributes(
- EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case LOCAL_PERSISTENT_OVERFLOW: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
- af.setScope(Scope.LOCAL);
- af.setEvictionAttributes(
- EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case PARTITION_PROXY: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.PARTITION);
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
- paf.setLocalMaxMemory(0);
- af.setPartitionAttributes(paf.create());
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case PARTITION_PROXY_REDUNDANT: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.PARTITION);
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
- paf.setLocalMaxMemory(0);
- paf.setRedundantCopies(1);
- af.setPartitionAttributes(paf.create());
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case REPLICATE_PROXY: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.EMPTY);
- af.setScope(Scope.DISTRIBUTED_ACK);
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- default:
- throw new IllegalStateException("unhandled enum " + shortcut);
+ private void sendRemoveCacheServerProfileMessage() {
+ Set<InternalDistributedMember> otherMembers = dm.getOtherDistributionManagerIds();
+ RemoveCacheServerProfileMessage message = new RemoveCacheServerProfileMessage();
+ message.operateOnLocalCache(this);
+
+ // This block prevents sending a message to old members that do not know about
+ // the RemoveCacheServerProfileMessage
+ otherMembers.removeIf(member -> Version.GEODE_1_5_0.compareTo(member.getVersionObject()) > 0);
+
+ if (!otherMembers.isEmpty()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Sending remove cache server profile message to other members.");
+ }
+ ReplyProcessor21 replyProcessor = new ReplyProcessor21(dm, otherMembers);
+ message.setRecipients(otherMembers);
+ message.processorId = replyProcessor.getProcessorId();
+ dm.putOutgoing(message);
+
+ // Wait for replies.
+ try {
+ replyProcessor.waitForReplies();
+ } catch (InterruptedException ignore) {
+ Thread.currentThread().interrupt();
}
}
}
- public static void initializeClientRegionShortcuts(Cache cache) {
- for (ClientRegionShortcut shortcut : ClientRegionShortcut.values()) {
- switch (shortcut) {
- case LOCAL: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.NORMAL);
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case LOCAL_PERSISTENT: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case LOCAL_HEAP_LRU: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.NORMAL);
- af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes());
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case LOCAL_OVERFLOW: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.NORMAL);
- af.setEvictionAttributes(
- EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case LOCAL_PERSISTENT_OVERFLOW: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
- af.setEvictionAttributes(
- EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
- cache.setRegionAttributes(shortcut.toString(), af.create());
- break;
- }
- case PROXY: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.EMPTY);
- UserSpecifiedRegionAttributes<?, ?> attributes =
- (UserSpecifiedRegionAttributes) af.create();
- attributes.requiresPoolName = true;
- cache.setRegionAttributes(shortcut.toString(), attributes);
- break;
- }
- case CACHING_PROXY: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.NORMAL);
- UserSpecifiedRegionAttributes<?, ?> attributes =
- (UserSpecifiedRegionAttributes) af.create();
- attributes.requiresPoolName = true;
- cache.setRegionAttributes(shortcut.toString(), attributes);
- break;
- }
- case CACHING_PROXY_HEAP_LRU: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.NORMAL);
- af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes());
- UserSpecifiedRegionAttributes<?, ?> attributes =
- (UserSpecifiedRegionAttributes) af.create();
- attributes.requiresPoolName = true;
- cache.setRegionAttributes(shortcut.toString(), attributes);
- break;
- }
- case CACHING_PROXY_OVERFLOW: {
- AttributesFactory<?, ?> af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.NORMAL);
- af.setEvictionAttributes(
- EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
- UserSpecifiedRegionAttributes<?, ?> attributes =
- (UserSpecifiedRegionAttributes) af.create();
- attributes.requiresPoolName = true;
- cache.setRegionAttributes(shortcut.toString(), attributes);
- break;
- }
- default:
- throw new IllegalStateException("unhandled enum " + shortcut);
- }
+ @Override
+ public TXManagerImpl getTxManager() {
+ return transactionManager;
+ }
+
+ @Override
+ public <K, V> RegionFactory<K, V> createRegionFactory(RegionShortcut shortcut) {
+ throwIfClient();
+ return new InternalRegionFactory<>(this, shortcut);
+ }
+
+ @Override
+ public <K, V> RegionFactory<K, V> createRegionFactory() {
+ throwIfClient();
+ return new InternalRegionFactory<>(this);
+ }
+
+ @Override
+ public <K, V> RegionFactory<K, V> createRegionFactory(String regionAttributesId) {
+ throwIfClient();
+ return new InternalRegionFactory<>(this, regionAttributesId);
+ }
+
+ @Override
+ public <K, V> RegionFactory<K, V> createRegionFactory(RegionAttributes<K, V> regionAttributes) {
+ throwIfClient();
+ return new InternalRegionFactory<>(this, regionAttributes);
+ }
+
+ /**
+ * @since GemFire 6.5
+ */
+ @Override
+ public <K, V> ClientRegionFactory<K, V> createClientRegionFactory(ClientRegionShortcut shortcut) {
+ return new ClientRegionFactoryImpl<>(this, shortcut);
+ }
+
+ @Override
+ public <K, V> ClientRegionFactory<K, V> createClientRegionFactory(String regionAttributesId) {
+ return new ClientRegionFactoryImpl<>(this, regionAttributesId);
+ }
+
+ @Override
+ public QueryService getQueryService(String poolName) {
+ Pool pool = PoolManager.find(poolName);
+ if (pool == null) {
+ throw new IllegalStateException("Could not find a pool named " + poolName);
+ }
+ return pool.getQueryService();
+ }
+
+ @Override
+ public RegionService createAuthenticatedView(Properties userSecurityProperties) {
+ Pool pool = getDefaultPool();
+ if (pool == null) {
+ throw new IllegalStateException("This cache does not have a default pool");
+ }
+ return createAuthenticatedCacheView(pool, userSecurityProperties);
+ }
+
+ @Override
+ public RegionService createAuthenticatedView(Properties userSecurityProperties, String poolName) {
+ Pool pool = PoolManager.find(poolName);
+ if (pool == null) {
+ throw new IllegalStateException("Pool " + poolName + " does not exist");
}
+ return createAuthenticatedCacheView(pool, userSecurityProperties);
}
@Override
@@ -4834,6 +4734,12 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
return cacheConfig.pdxSerializer;
}
+ @VisibleForTesting
+ public void setPdxSerializer(PdxSerializer serializer) {
+ cacheConfig.setPdxSerializer(serializer);
+ basicSetPdxSerializer(serializer);
+ }
+
@Override
public String getPdxDiskStore() {
return cacheConfig.pdxDiskStore;
@@ -4931,12 +4837,6 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
return txEntryStateFactory;
}
- @VisibleForTesting
- public void setPdxSerializer(PdxSerializer serializer) {
- cacheConfig.setPdxSerializer(serializer);
- basicSetPdxSerializer(serializer);
- }
-
private void basicSetPdxSerializer(PdxSerializer serializer) {
typeRegistrySetPdxSerializer.accept(serializer);
if (serializer instanceof ReflectionBasedAutoSerializer) {
@@ -5161,9 +5061,48 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
return statisticsClock;
}
+ @FunctionalInterface
@VisibleForTesting
- void setDisconnectCause(Throwable disconnectCause) {
- this.disconnectCause = disconnectCause;
+ interface InternalCqServiceFactory {
+ CqService create(InternalCache internalCache, ModuleService moduleService);
+ }
+
+ @FunctionalInterface
+ @VisibleForTesting
+ interface TXManagerImplFactory {
+ TXManagerImpl create(CachePerfStats cachePerfStats, InternalCache cache,
+ StatisticsClock statisticsClock);
+ }
+
+ @FunctionalInterface
+ @VisibleForTesting
+ interface InternalSecurityServiceFactory {
+ SecurityService create(Properties properties, CacheConfig cacheConfig);
+ }
+
+ @FunctionalInterface
+ @VisibleForTesting
+ interface CachePerfStatsFactory {
+ CachePerfStats create(StatisticsFactory factory, StatisticsClock clock);
+ }
+
+ @FunctionalInterface
+ @VisibleForTesting
+ interface TypeRegistryFactory {
+ TypeRegistry create(InternalCache cache, boolean disableTypeRegistry);
+ }
+
+ @FunctionalInterface
+ @VisibleForTesting
+ interface HeapEvictorFactory {
+ HeapEvictor create(InternalCache cache, StatisticsClock statisticsClock);
+ }
+
+ @FunctionalInterface
+ @VisibleForTesting
+ interface ReplyProcessor21Factory {
+ ReplyProcessor21 create(InternalDistributedSystem system,
+ Collection<InternalDistributedMember> initMembers);
}
private class Stopper extends CancelCriterion {
@@ -5249,42 +5188,4 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
}
}
}
-
- @FunctionalInterface
- @VisibleForTesting
- interface TXManagerImplFactory {
- TXManagerImpl create(CachePerfStats cachePerfStats, InternalCache cache,
- StatisticsClock statisticsClock);
- }
-
- @FunctionalInterface
- @VisibleForTesting
- interface InternalSecurityServiceFactory {
- SecurityService create(Properties properties, CacheConfig cacheConfig);
- }
-
- @FunctionalInterface
- @VisibleForTesting
- interface CachePerfStatsFactory {
- CachePerfStats create(StatisticsFactory factory, StatisticsClock clock);
- }
-
- @FunctionalInterface
- @VisibleForTesting
- interface TypeRegistryFactory {
- TypeRegistry create(InternalCache cache, boolean disableTypeRegistry);
- }
-
- @FunctionalInterface
- @VisibleForTesting
- interface HeapEvictorFactory {
- HeapEvictor create(InternalCache cache, StatisticsClock statisticsClock);
- }
-
- @FunctionalInterface
- @VisibleForTesting
- interface ReplyProcessor21Factory {
- ReplyProcessor21 create(InternalDistributedSystem system,
- Collection<InternalDistributedMember> initMembers);
- }
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplCloseTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplCloseTest.java
index c11a385..49d6569 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplCloseTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplCloseTest.java
@@ -233,7 +233,7 @@ public class GemFireCacheImplCloseTest {
(properties, cacheConfigArg) -> mock(SecurityService.class),
() -> true,
mock(Function.class),
- mock(Function.class),
+ mock(GemFireCacheImpl.InternalCqServiceFactory.class),
(factory, clock) -> mock(CachePerfStats.class),
mock(GemFireCacheImpl.TXManagerImplFactory.class),
mock(Supplier.class),
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
index 9fbae78..fd08029 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
@@ -675,7 +675,7 @@ public class GemFireCacheImplTest {
(properties, cacheConfigArg) -> mock(SecurityService.class),
() -> true,
mock(Function.class),
- mock(Function.class),
+ mock(GemFireCacheImpl.InternalCqServiceFactory.class),
(factory, clock) -> mock(CachePerfStats.class),
mock(GemFireCacheImpl.TXManagerImplFactory.class),
mock(Supplier.class),