You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/04/20 18:00:04 UTC
[1/3] geode git commit: 2632: 1st pass cleaning up GemFireCacheImpl
[Forced Update!]
Repository: geode
Updated Branches:
refs/heads/feature/GEODE-2632-6 c64994713 -> 625e71b3e (forced update)
http://git-wip-us.apache.org/repos/asf/geode/blob/625e71b3/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
index 709308b..9e8a0b7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
@@ -76,7 +76,7 @@ public interface InternalCache extends Cache, Extensible<Cache> {
FilterProfile getFilterProfile(String regionName);
- Region getRegion(String path, boolean returnDestroyedRegion);
+ <K, V> Region<K, V> getRegion(String path, boolean returnDestroyedRegion);
MemoryAllocator getOffHeapStore();
http://git-wip-us.apache.org/repos/asf/geode/blob/625e71b3/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
index a5f0fc2..d80fe16 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
@@ -1727,7 +1727,7 @@ public class CacheCreation implements InternalCache {
}
@Override
- public Region getRegion(final String path, final boolean returnDestroyedRegion) {
+ public <K, V> Region<K, V> getRegion(final String path, final boolean returnDestroyedRegion) {
throw new UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString());
}
[2/3] geode git commit: 2632: 1st pass cleaning up GemFireCacheImpl
Posted by kl...@apache.org.
http://git-wip-us.apache.org/repos/asf/geode/blob/625e71b3/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 56243e1..ed66ae9 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -12,12 +12,66 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintStream;
+import java.io.Reader;
+import java.io.StringBufferInputStream;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.naming.Context;
+
import com.sun.jna.Native;
import com.sun.jna.Platform;
import org.apache.commons.lang.StringUtils;
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.ForcedDisconnectException;
@@ -89,6 +143,7 @@ import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.CacheTime;
+import org.apache.geode.distributed.internal.ClusterConfigurationService;
import org.apache.geode.distributed.internal.DM;
import org.apache.geode.distributed.internal.DistributionAdvisee;
import org.apache.geode.distributed.internal.DistributionAdvisor;
@@ -103,7 +158,6 @@ import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.ResourceEvent;
import org.apache.geode.distributed.internal.ResourceEventsListener;
import org.apache.geode.distributed.internal.ServerLocation;
-import org.apache.geode.distributed.internal.ClusterConfigurationService;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.i18n.LogWriterI18n;
@@ -125,7 +179,6 @@ import org.apache.geode.internal.cache.partitioned.RedundancyAlreadyMetException
import org.apache.geode.internal.cache.persistence.BackupManager;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.cache.persistence.PersistentMemberManager;
-import org.apache.geode.internal.cache.persistence.query.TemporaryResultSetFactory;
import org.apache.geode.internal.cache.snapshot.CacheSnapshotServiceImpl;
import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
@@ -173,61 +226,6 @@ import org.apache.geode.pdx.internal.PdxInstanceFactoryImpl;
import org.apache.geode.pdx.internal.PdxInstanceImpl;
import org.apache.geode.pdx.internal.TypeRegistry;
import org.apache.geode.redis.GeodeRedisServer;
-import org.apache.logging.log4j.Logger;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.PrintStream;
-import java.io.Reader;
-import java.io.StringBufferInputStream;
-import java.io.StringWriter;
-import java.io.Writer;
-import java.net.InetSocketAddress;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.ServiceLoader;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.naming.Context;
// TODO: somebody Come up with more reasonable values for {@link #DEFAULT_LOCK_TIMEOUT}, etc.
/**
@@ -238,23 +236,22 @@ public class GemFireCacheImpl
implements InternalCache, ClientCache, HasCachePerfStats, DistributionAdvisee, CacheTime {
private static final Logger logger = LogService.getLogger();
- // moved *SERIAL_NUMBER stuff to DistributionAdvisor
-
/** The default number of seconds to wait for a distributed lock */
- public static final int DEFAULT_LOCK_TIMEOUT = Integer
- .getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.defaultLockTimeout", 60).intValue();
+ public static final int DEFAULT_LOCK_TIMEOUT =
+ Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.defaultLockTimeout", 60);
/**
* The default duration (in seconds) of a lease on a distributed lock
*/
- public static final int DEFAULT_LOCK_LEASE = Integer
- .getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.defaultLockLease", 120).intValue();
+ public static final int DEFAULT_LOCK_LEASE =
+ Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.defaultLockLease", 120);
/** The default "copy on read" attribute value */
public static final boolean DEFAULT_COPY_ON_READ = false;
/** the last instance of GemFireCache created */
private static volatile GemFireCacheImpl instance = null;
+
/**
* Just like instance but is valid for a bit longer so that pdx can still find the cache during a
* close.
@@ -264,14 +261,13 @@ public class GemFireCacheImpl
/**
* The default amount of time to wait for a <code>netSearch</code> to complete
*/
- public static final int DEFAULT_SEARCH_TIMEOUT = Integer
- .getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.defaultSearchTimeout", 300).intValue();
+ public static final int DEFAULT_SEARCH_TIMEOUT =
+ Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.defaultSearchTimeout", 300);
/**
* The <code>CacheLifecycleListener</code> s that have been registered in this VM
*/
- private static final Set<CacheLifecycleListener> cacheLifecycleListeners =
- new HashSet<CacheLifecycleListener>();
+ private static final Set<CacheLifecycleListener> cacheLifecycleListeners = new HashSet<>();
/**
* Define gemfire.Cache.ASYNC_EVENT_LISTENERS=true to invoke event listeners in the background
@@ -286,32 +282,32 @@ public class GemFireCacheImpl
*
* @since GemFire hitachi 6.1.2.9
*/
- public static boolean DELTAS_RECALCULATE_SIZE =
+ static boolean DELTAS_RECALCULATE_SIZE =
Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "DELTAS_RECALCULATE_SIZE");
- public static final int EVENT_QUEUE_LIMIT = Integer
- .getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.EVENT_QUEUE_LIMIT", 4096).intValue();
- public static final int EVENT_THREAD_LIMIT = Integer
- .getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.EVENT_THREAD_LIMIT", 16).intValue();
+ private static final int EVENT_QUEUE_LIMIT =
+ Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.EVENT_QUEUE_LIMIT", 4096);
+
+ static final int EVENT_THREAD_LIMIT =
+ Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.EVENT_THREAD_LIMIT", 16);
/**
* System property to limit the max query-execution time. By default its turned off (-1), the time
* is set in MiliSecs.
*/
public static final int MAX_QUERY_EXECUTION_TIME =
- Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.MAX_QUERY_EXECUTION_TIME", -1)
- .intValue();
+ Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.MAX_QUERY_EXECUTION_TIME", -1);
/**
* System property to disable query monitor even if resource manager is in use
*/
- public final boolean QUERY_MONITOR_DISABLED_FOR_LOW_MEM = Boolean
+ private final boolean QUERY_MONITOR_DISABLED_FOR_LOW_MEM = Boolean
.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "Cache.DISABLE_QUERY_MONITOR_FOR_LOW_MEMORY");
/**
* Property set to true if resource manager heap percentage is set and query monitor is required
*/
- public static Boolean QUERY_MONITOR_REQUIRED_FOR_RESOURCE_MANAGER = Boolean.FALSE;
+ private static Boolean QUERY_MONITOR_REQUIRED_FOR_RESOURCE_MANAGER = Boolean.FALSE;
/**
* This property defines internal function that will get executed on each node to fetch active
@@ -328,11 +324,11 @@ public class GemFireCacheImpl
// time in ms
private static final int FIVE_HOURS = 5 * 60 * 60 * 1000;
+
/** To test MAX_QUERY_EXECUTION_TIME option. */
public int TEST_MAX_QUERY_EXECUTION_TIME = -1;
- public boolean TEST_MAX_QUERY_EXECUTION_TIME_OVERRIDE_EXCEPTION = false;
- // ///////////////////// Instance Fields ///////////////////////
+ public boolean TEST_MAX_QUERY_EXECUTION_TIME_OVERRIDE_EXCEPTION = false;
private final InternalDistributedSystem system;
@@ -340,24 +336,24 @@ public class GemFireCacheImpl
// This is a HashMap because I know that clear() on it does
// not allocate objects.
- private final HashMap rootRegions;
+ private final HashMap<String, LocalRegion> rootRegions;
/**
* True if this cache is being created by a ClientCacheFactory.
*/
private final boolean isClient;
- private PoolFactory clientpf;
+
+ private PoolFactory clientPoolFactory;
+
/**
* It is not final to allow cache.xml parsing to set it.
*/
private Pool defaultPool;
- private final ConcurrentMap pathToRegion = new ConcurrentHashMap();
+ private final ConcurrentMap<String, Region> pathToRegion = new ConcurrentHashMap<>();
protected volatile boolean isInitialized = false;
protected volatile boolean isClosing = false;
- protected volatile boolean closingGatewaySendersByShutdownAll = false;
- protected volatile boolean closingGatewayReceiversByShutdownAll = false;
/** Amount of time (in seconds) to wait for a distributed lock */
private int lockTimeout = DEFAULT_LOCK_TIMEOUT;
@@ -381,14 +377,14 @@ public class GemFireCacheImpl
* retrieval operations. It is assumed that the traversal operations on cache servers list vastly
* outnumber the mutative operations such as add, remove.
*/
- private volatile List allCacheServers = new CopyOnWriteArrayList();
+ private volatile List<CacheServerImpl> allCacheServers = new CopyOnWriteArrayList<>();
/**
* Controls updates to the list of all gateway senders
*
* @see #allGatewaySenders
*/
- public final Object allGatewaySendersLock = new Object();
+ private final Object allGatewaySendersLock = new Object();
/**
* the set of all gateway senders. It may be fetched safely (for enumeration), but updates must by
@@ -400,22 +396,20 @@ public class GemFireCacheImpl
* The list of all async event queues added to the cache. CopyOnWriteArrayList is used to allow
* concurrent add, remove and retrieval operations.
*/
- private volatile Set<AsyncEventQueue> allVisibleAsyncEventQueues =
- new CopyOnWriteArraySet<AsyncEventQueue>();
+ private volatile Set<AsyncEventQueue> allVisibleAsyncEventQueues = new CopyOnWriteArraySet<>();
/**
* The list of all async event queues added to the cache. CopyOnWriteArrayList is used to allow
* concurrent add, remove and retrieval operations.
*/
- private volatile Set<AsyncEventQueue> allAsyncEventQueues =
- new CopyOnWriteArraySet<AsyncEventQueue>();
+ private volatile Set<AsyncEventQueue> allAsyncEventQueues = new CopyOnWriteArraySet<>();
/**
* Controls updates to the list of all gateway receivers
*
* @see #allGatewayReceivers
*/
- public final Object allGatewayReceiversLock = new Object();
+ private final Object allGatewayReceiversLock = new Object();
/**
* the list of all gateway Receivers. It may be fetched safely (for enumeration), but updates must
@@ -423,10 +417,12 @@ public class GemFireCacheImpl
*/
private volatile Set<GatewayReceiver> allGatewayReceivers = Collections.emptySet();
- /** PartitionedRegion instances (for required-events notification */
- // This is a HashSet because I know that clear() on it does not
- // allocate any objects.
- private final HashSet<PartitionedRegion> partitionedRegions = new HashSet<PartitionedRegion>();
+ /**
+ * PartitionedRegion instances (for required-events notification
+ * </p>
+ * This is a HashSet because I know that clear() on it does not allocate any objects.
+ */
+ private final HashSet<PartitionedRegion> partitionedRegions = new HashSet<>();
/**
* Fix for 42051 This is a map of regions that are in the process of being destroyed. We could
@@ -436,14 +432,14 @@ public class GemFireCacheImpl
* that ID if it receives it as part of the persistent view.
*/
private final ConcurrentMap<String, DistributedRegion> regionsInDestroy =
- new ConcurrentHashMap<String, DistributedRegion>();
+ new ConcurrentHashMap<>();
- public final Object allGatewayHubsLock = new Object();
+ private final Object allGatewayHubsLock = new Object();
/**
* conflict resolver for WAN, if any
*
- * @guarded.By {@link #allGatewayHubsLock}
+ * GuardedBy {@link #allGatewayHubsLock}
*/
private GatewayConflictResolver gatewayConflictResolver;
@@ -461,7 +457,8 @@ public class GemFireCacheImpl
private volatile boolean copyOnRead = DEFAULT_COPY_ON_READ;
/** The named region attributes registered with this cache. */
- private final Map namedRegionAttributes = Collections.synchronizedMap(new HashMap());
+ private final Map<String, RegionAttributes> namedRegionAttributes =
+ Collections.synchronizedMap(new HashMap<>());
/**
* if this cache was forced to close due to a forced-disconnect, we retain a
@@ -473,10 +470,10 @@ public class GemFireCacheImpl
* if this cache was forced to close due to a forced-disconnect or system failure, this keeps
* track of the reason
*/
- protected volatile Throwable disconnectCause = null;
+ private volatile Throwable disconnectCause = null;
/** context where this cache was created -- for debugging, really... */
- public Exception creationStack = null;
+ private Exception creationStack = null;
/**
* a system timer task for cleaning up old bridge thread event entries
@@ -490,7 +487,7 @@ public class GemFireCacheImpl
* is created. Destroyed by GemFireCache when closing the cache. Protected by synchronization on
* this GemFireCache.
*
- * @guarded.By prLockServiceLock
+ * GuardedBy prLockServiceLock
*/
private DistributedLockService prLockService;
@@ -503,7 +500,7 @@ public class GemFireCacheImpl
* DistributedLockService for GatewaySenders. Remains null until the first GatewaySender is
* created. Destroyed by GemFireCache when closing the cache.
*
- * @guarded.By gatewayLockServiceLock
+ * GuardedBy gatewayLockServiceLock
*/
private volatile DistributedLockService gatewayLockService;
@@ -514,7 +511,7 @@ public class GemFireCacheImpl
private final InternalResourceManager resourceManager;
- private final AtomicReference<BackupManager> backupManager = new AtomicReference<BackupManager>();
+ private final AtomicReference<BackupManager> backupManager = new AtomicReference<>();
private HeapEvictor heapEvictor = null;
@@ -558,13 +555,12 @@ public class GemFireCacheImpl
private final DiskStoreMonitor diskMonitor;
// Stores the properties used to initialize declarables.
- private final Map<Declarable, Properties> declarablePropertiesMap =
- new ConcurrentHashMap<Declarable, Properties>();
+ private final Map<Declarable, Properties> declarablePropertiesMap = new ConcurrentHashMap<>();
/** {@link PropertyResolver} to resolve ${} type property strings */
protected static PropertyResolver resolver;
- protected static boolean xmlParameterizationEnabled =
+ private static boolean xmlParameterizationEnabled =
!Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "xml.parameterization.disabled");
public static Runnable internalBeforeApplyChanges;
@@ -587,22 +583,18 @@ public class GemFireCacheImpl
*
* @since GemFire 8.1
*/
- private SimpleExtensionPoint<Cache> extensionPoint = new SimpleExtensionPoint<Cache>(this, this);
+ private SimpleExtensionPoint<Cache> extensionPoint = new SimpleExtensionPoint<>(this, this);
private final CqService cqService;
- private final Set<RegionListener> regionListeners = new ConcurrentHashSet<RegionListener>();
+ private final Set<RegionListener> regionListeners = new ConcurrentHashSet<>();
- private final Map<Class<? extends CacheService>, CacheService> services =
- new HashMap<Class<? extends CacheService>, CacheService>();
+ private final Map<Class<? extends CacheService>, CacheService> services = new HashMap<>();
public static final int DEFAULT_CLIENT_FUNCTION_TIMEOUT = 0;
private static int clientFunctionTimeout;
- private final static Boolean DISABLE_AUTO_EVICTION =
- Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "disableAutoEviction");
-
private static SecurityService securityService = SecurityService.getSecurityService();
static {
@@ -629,15 +621,13 @@ public class GemFireCacheImpl
* and stack as well as new memory mapped files or shared memory regions.
*
* @return 0 if success, non-zero if error and errno set
- *
*/
private static native int mlockall(int flags);
public static void lockMemory() {
- int result = 0;
try {
Native.register(Platform.C_LIBRARY_NAME);
- result = mlockall(1);
+ int result = mlockall(1);
if (result == 0) {
return;
}
@@ -663,14 +653,14 @@ public class GemFireCacheImpl
public String toString() {
final StringBuffer sb = new StringBuffer();
sb.append("GemFireCache[");
- sb.append("id = " + System.identityHashCode(this));
- sb.append("; isClosing = " + this.isClosing);
- sb.append("; isShutDownAll = " + isCacheAtShutdownAll());
- sb.append("; created = " + this.creationDate);
- sb.append("; server = " + this.isServer);
- sb.append("; copyOnRead = " + this.copyOnRead);
- sb.append("; lockLease = " + this.lockLease);
- sb.append("; lockTimeout = " + this.lockTimeout);
+ sb.append("id = ").append(System.identityHashCode(this));
+ sb.append("; isClosing = ").append(this.isClosing);
+ sb.append("; isShutDownAll = ").append(isCacheAtShutdownAll());
+ sb.append("; created = ").append(this.creationDate);
+ sb.append("; server = ").append(this.isServer);
+ sb.append("; copyOnRead = ").append(this.copyOnRead);
+ sb.append("; lockLease = ").append(this.lockLease);
+ sb.append("; lockTimeout = ").append(this.lockTimeout);
if (this.creationStack != null) {
sb.append("\nCreation context:\n");
OutputStream os = new OutputStream() {
@@ -709,7 +699,7 @@ public class GemFireCacheImpl
* @return the existing cache
* @throws CacheClosedException if an existing cache can not be found.
*/
- public static final GemFireCacheImpl getExisting() {
+ public static GemFireCacheImpl getExisting() {
final GemFireCacheImpl result = instance;
if (result != null && !result.isClosing) {
return result;
@@ -748,15 +738,6 @@ public class GemFireCacheImpl
return result;
}
- // /**
- // * @deprecated remove when Lise allows a Hydra VM to
- // * be re-created
- // */
- // public static void clearInstance() {
- // System.err.println("DEBUG: do not commit GemFireCache#clearInstance");
- // instance = null;
- // }
-
public static GemFireCacheImpl createClient(DistributedSystem system, PoolFactory pf,
CacheConfig cacheConfig) {
return basicCreate(system, true, cacheConfig, pf, true, ASYNC_EVENT_LISTENERS, null);
@@ -766,7 +747,7 @@ public class GemFireCacheImpl
return basicCreate(system, true, cacheConfig, null, false, ASYNC_EVENT_LISTENERS, null);
}
- public static GemFireCacheImpl createWithAsyncEventListeners(DistributedSystem system,
+ static GemFireCacheImpl createWithAsyncEventListeners(DistributedSystem system,
CacheConfig cacheConfig, TypeRegistry typeRegistry) {
return basicCreate(system, true, cacheConfig, null, false, true, typeRegistry);
}
@@ -776,8 +757,6 @@ public class GemFireCacheImpl
return basicCreate(system, existingOk, cacheConfig, null, false, ASYNC_EVENT_LISTENERS, null);
}
-
-
private static GemFireCacheImpl basicCreate(DistributedSystem system, boolean existingOk,
CacheConfig cacheConfig, PoolFactory pf, boolean isClient, boolean asyncEventListeners,
TypeRegistry typeRegistry) throws CacheExistsException, TimeoutException,
@@ -829,7 +808,7 @@ public class GemFireCacheImpl
private GemFireCacheImpl(boolean isClient, PoolFactory pf, DistributedSystem system,
CacheConfig cacheConfig, boolean asyncEventListeners, TypeRegistry typeRegistry) {
this.isClient = isClient;
- this.clientpf = pf;
+ this.clientPoolFactory = pf;
this.cacheConfig = cacheConfig; // do early for bug 43213
this.pdxRegistry = typeRegistry;
@@ -867,7 +846,7 @@ public class GemFireCacheImpl
}
}
- this.rootRegions = new HashMap();
+ this.rootRegions = new HashMap<>();
this.cqService = CqServiceProvider.create(this);
@@ -883,26 +862,26 @@ public class GemFireCacheImpl
this.persistentMemberManager = new PersistentMemberManager();
if (asyncEventListeners) {
- final ThreadGroup group =
+ final ThreadGroup threadGroup =
LoggingThreadGroup.createThreadGroup("Message Event Threads", logger);
- ThreadFactory tf = new ThreadFactory() {
+ ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(final Runnable command) {
- final Runnable r = new Runnable() {
+ final Runnable runnable = new Runnable() {
@Override
public void run() {
ConnectionTable.threadWantsSharedResources();
command.run();
}
};
- Thread thread = new Thread(group, r, "Message Event Thread");
+ Thread thread = new Thread(threadGroup, runnable, "Message Event Thread");
thread.setDaemon(true);
return thread;
}
};
- ArrayBlockingQueue q = new ArrayBlockingQueue(EVENT_QUEUE_LIMIT);
- this.eventThreadPool = new PooledExecutorWithDMStats(q, EVENT_THREAD_LIMIT,
- this.cachePerfStats.getEventPoolHelper(), tf, 1000);
+ ArrayBlockingQueue queue = new ArrayBlockingQueue(EVENT_QUEUE_LIMIT);
+ this.eventThreadPool = new PooledExecutorWithDMStats(queue, EVENT_THREAD_LIMIT,
+ this.cachePerfStats.getEventPoolHelper(), threadFactory, 1000);
} else {
this.eventThreadPool = null;
}
@@ -949,7 +928,7 @@ public class GemFireCacheImpl
this.txEntryStateFactory = TXEntryState.getFactory();
if (xmlParameterizationEnabled) {
- /** If product properties file is available replace properties from there */
+ // If product properties file is available replace properties from there
Properties userProps = this.system.getConfig().getUserDefinedProps();
if (userProps != null && !userProps.isEmpty()) {
resolver = new CacheXmlPropertyResolver(false,
@@ -983,11 +962,11 @@ public class GemFireCacheImpl
return restAgent;
}
- /*****
+ /**
* Request the shared configuration from the locator(s) which have the Cluster config service
* running
*/
- public ConfigurationResponse requestSharedConfiguration() {
+ private ConfigurationResponse requestSharedConfiguration() {
final DistributionConfig config = this.system.getConfig();
if (!(dm instanceof DistributionManager))
@@ -1015,13 +994,11 @@ public class GemFireCacheImpl
return null;
}
-
- ConfigurationResponse response = null;
List<String> locatorConnectionStrings = getSharedConfigLocatorConnectionStringList();
try {
- response = ClusterConfigurationLoader.requestConfigurationFromLocators(system.getConfig(),
- locatorConnectionStrings);
+ ConfigurationResponse response = ClusterConfigurationLoader
+ .requestConfigurationFromLocators(system.getConfig(), locatorConnectionStrings);
// log the configuration received from the locator
logger.info(LocalizedMessage
@@ -1064,15 +1041,10 @@ public class GemFireCacheImpl
}
}
- public void deployJarsRecevedFromClusterConfiguration(ConfigurationResponse response) {
+ private void deployJarsRecevedFromClusterConfiguration(ConfigurationResponse response) {
try {
ClusterConfigurationLoader.deployJarsReceivedFromClusterConfiguration(this, response);
- } catch (IOException e) {
- throw new GemFireConfigException(
- LocalizedStrings.GemFireCache_EXCEPTION_OCCURED_WHILE_DEPLOYING_JARS_FROM_SHARED_CONDFIGURATION
- .toLocalizedString(),
- e);
- } catch (ClassNotFoundException e) {
+ } catch (IOException | ClassNotFoundException e) {
throw new GemFireConfigException(
LocalizedStrings.GemFireCache_EXCEPTION_OCCURED_WHILE_DEPLOYING_JARS_FROM_SHARED_CONDFIGURATION
.toLocalizedString(),
@@ -1080,10 +1052,10 @@ public class GemFireCacheImpl
}
}
-
- // When called, clusterProps and serverProps and key could not be null
- public static boolean isMisConfigured(Properties clusterProps, Properties serverProps,
- String key) {
+ /**
+ * When called, clusterProps and serverProps and key could not be null
+ */
+ static boolean isMisConfigured(Properties clusterProps, Properties serverProps, String key) {
String clusterPropValue = clusterProps.getProperty(key);
String serverPropValue = serverProps.getProperty(key);
@@ -1099,8 +1071,8 @@ public class GemFireCacheImpl
return !clusterPropValue.equals(serverPropValue);
}
- public List<String> getSharedConfigLocatorConnectionStringList() {
- List<String> locatorConnectionStringList = new ArrayList<String>();
+ private List<String> getSharedConfigLocatorConnectionStringList() {
+ List<String> locatorConnectionStringList = new ArrayList<>();
Map<InternalDistributedMember, Collection<String>> scl =
this.getDistributionManager().getAllHostedLocatorsWithSharedConfiguration();
@@ -1113,18 +1085,12 @@ public class GemFireCacheImpl
for (Entry<InternalDistributedMember, Collection<String>> loc : locs) {
Collection<String> locStrings = loc.getValue();
- Iterator<String> locStringIter = locStrings.iterator();
-
- while (locStringIter.hasNext()) {
- locatorConnectionStringList.add(locStringIter.next());
- }
+ locatorConnectionStringList.addAll(locStrings);
}
}
return locatorConnectionStringList;
}
-
-
/**
* Used by unit tests to force cache creation to use a test generated cache.xml
*/
@@ -1184,9 +1150,7 @@ public class GemFireCacheImpl
GemFireCacheImpl.instance = this;
GemFireCacheImpl.pdxInstance = this;
- for (Iterator<CacheLifecycleListener> iter = cacheLifecycleListeners.iterator(); iter
- .hasNext();) {
- CacheLifecycleListener listener = (CacheLifecycleListener) iter.next();
+ for (CacheLifecycleListener listener : cacheLifecycleListeners) {
listener.cacheCreated(this);
}
@@ -1211,7 +1175,6 @@ public class GemFireCacheImpl
securityService.setPostProcessor(cacheConfig.getPostProcessor());
}
-
SystemMemberCacheEventProcessor.send(this, Operation.CACHE_CREATE);
this.resourceAdvisor.initializationGate();
@@ -1225,8 +1188,7 @@ public class GemFireCacheImpl
// we will not be ready for all the events that cache.xml
// processing can deliver (region creation, etc.).
// This call may need to be moved inside initializeDeclarativeCache.
- /** Entry to GemFire Management service **/
- this.jmxAdvisor.initializationGate();
+ this.jmxAdvisor.initializationGate(); // Entry to GemFire Management service
// this starts up the ManagementService, register and federate the internal beans
system.handleResourceEvent(ResourceEvent.CACHE_CREATE, this);
@@ -1256,7 +1218,7 @@ public class GemFireCacheImpl
}
}
- this.clientpf = null;
+ this.clientPoolFactory = null;
startColocatedJmxManagerLocator();
@@ -1287,7 +1249,7 @@ public class GemFireCacheImpl
}
private boolean isNotJmxManager() {
- return (this.system.getConfig().getJmxManagerStart() != true);
+ return !this.system.getConfig().getJmxManagerStart();
}
private boolean isServerNode() {
@@ -1346,7 +1308,6 @@ public class GemFireCacheImpl
}
}
-
@Override
public URL getCacheXmlURL() {
if (this.getMyId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
@@ -1360,7 +1321,7 @@ public class GemFireCacheImpl
return null;
}
- URL url = null;
+ URL url;
if (!xmlFile.exists() || !xmlFile.isFile()) {
// do a resource search
String resource = xmlFile.getPath();
@@ -1434,7 +1395,7 @@ public class GemFireCacheImpl
try {
logCacheXML(url, cacheXmlDescription);
- InputStream stream = null;
+ InputStream stream;
if (cacheXmlDescription != null) {
if (logger.isTraceEnabled()) {
logger.trace("initializing cache with generated XML: {}", cacheXmlDescription);
@@ -1451,15 +1412,11 @@ public class GemFireCacheImpl
} catch (IOException ex) {
throw new CacheXmlException(
LocalizedStrings.GemFireCache_WHILE_OPENING_CACHE_XML_0_THE_FOLLOWING_ERROR_OCCURRED_1
- .toLocalizedString(new Object[] {url.toString(), ex}));
+ .toLocalizedString(url.toString(), ex));
} catch (CacheXmlException ex) {
- CacheXmlException newEx =
- new CacheXmlException(LocalizedStrings.GemFireCache_WHILE_READING_CACHE_XML_0_1
- .toLocalizedString(new Object[] {url, ex.getMessage()}));
- newEx.setStackTrace(ex.getStackTrace());
- newEx.initCause(ex.getCause());
- throw newEx;
+ throw new CacheXmlException(LocalizedStrings.GemFireCache_WHILE_READING_CACHE_XML_0_1
+ .toLocalizedString(url, ex.getMessage()), ex);
}
}
@@ -1536,11 +1493,6 @@ public class GemFireCacheImpl
protected class Stopper extends CancelCriterion {
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.CancelCriterion#cancelInProgress()
- */
@Override
public String cancelInProgress() {
String reason =
@@ -1557,11 +1509,6 @@ public class GemFireCacheImpl
return null;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.CancelCriterion#generateCancelledException(java.lang.Throwable)
- */
@Override
public RuntimeException generateCancelledException(Throwable e) {
String reason = cancelInProgress();
@@ -1659,8 +1606,9 @@ public class GemFireCacheImpl
* @see SystemFailure#loadEmergencyClasses()
*/
public static void loadEmergencyClasses() {
- if (emergencyClassesLoaded)
+ if (emergencyClassesLoaded) {
return;
+ }
emergencyClassesLoaded = true;
InternalDistributedSystem.loadEmergencyClasses();
AcceptorImpl.loadEmergencyClasses();
@@ -1687,7 +1635,6 @@ public class GemFireCacheImpl
GemFireCacheImpl.instance = null;
GemFireCacheImpl.pdxInstance = null;
// leave the PdxSerializer set if we have one to prevent 43412
- // TypeRegistry.setPdxSerializer(null);
// Shut down messaging first
InternalDistributedSystem ids = inst.system;
@@ -1706,12 +1653,10 @@ public class GemFireCacheImpl
System.err.println("DEBUG: Close cache servers");
}
{
- Iterator allCacheServersItr = inst.allCacheServers.iterator();
- while (allCacheServersItr.hasNext()) {
- CacheServerImpl bs = (CacheServerImpl) allCacheServersItr.next();
- AcceptorImpl ai = bs.getAcceptor();
- if (ai != null) {
- ai.emergencyClose();
+ for (CacheServerImpl server : inst.allCacheServers) {
+ AcceptorImpl acceptor = server.getAcceptor();
+ if (acceptor != null) {
+ acceptor.emergencyClose();
}
}
}
@@ -1725,13 +1670,10 @@ public class GemFireCacheImpl
System.err.println("DEBUG: closing gateway hubs");
}
- // These are synchronized sets -- avoid potential deadlocks
- // instance.pathToRegion.clear(); // garbage collection
- // instance.gatewayHubs.clear();
-
// rootRegions is intentionally *not* synchronized. The
// implementation of clear() does not currently allocate objects.
inst.rootRegions.clear();
+
// partitionedRegions is intentionally *not* synchronized, The
// implementation of clear() does not currently allocate objects.
inst.partitionedRegions.clear();
@@ -1752,7 +1694,7 @@ public class GemFireCacheImpl
private static final int shutdownAllPoolSize =
Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "SHUTDOWN_ALL_POOL_SIZE", -1);
- void shutdownSubTreeGracefully(Map<String, PartitionedRegion> prSubMap) {
+ private void shutdownSubTreeGracefully(Map<String, PartitionedRegion> prSubMap) {
for (final PartitionedRegion pr : prSubMap.values()) {
shutDownOnePRGracefully(pr);
}
@@ -1782,9 +1724,8 @@ public class GemFireCacheImpl
boolean testIGE = Boolean.getBoolean("TestInternalGemFireError");
if (testIGE) {
- InternalGemFireError assErr = new InternalGemFireError(
+ throw new InternalGemFireError(
LocalizedStrings.GemFireCache_UNEXPECTED_EXCEPTION.toLocalizedString());
- throw assErr;
}
// bug 44031 requires multithread shutdownall should be grouped
@@ -1805,7 +1746,9 @@ public class GemFireCacheImpl
}
});
} // for each root
+
es.shutdown();
+
try {
es.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
@@ -1827,20 +1770,21 @@ public class GemFireCacheImpl
}
private ExecutorService getShutdownAllExecutorService(int size) {
- final ThreadGroup thrGrp = LoggingThreadGroup.createThreadGroup("ShutdownAllGroup", logger);
- ThreadFactory thrFactory = new ThreadFactory() {
+ final ThreadGroup threadGroup =
+ LoggingThreadGroup.createThreadGroup("ShutdownAllGroup", logger);
+ ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger threadCount = new AtomicInteger(1);
@Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(thrGrp, r, "ShutdownAll-" + threadCount.getAndIncrement());
- t.setDaemon(true);
- return t;
+ public Thread newThread(Runnable runnable) {
+ Thread thread =
+ new Thread(threadGroup, runnable, "ShutdownAll-" + threadCount.getAndIncrement());
+ thread.setDaemon(true);
+ return thread;
}
};
- ExecutorService es = Executors
- .newFixedThreadPool(shutdownAllPoolSize == -1 ? size : shutdownAllPoolSize, thrFactory);
- return es;
+ return Executors.newFixedThreadPool(shutdownAllPoolSize == -1 ? size : shutdownAllPoolSize,
+ threadFactory);
}
private void shutDownOnePRGracefully(PartitionedRegion pr) {
@@ -1912,7 +1856,7 @@ public class GemFireCacheImpl
continue;
}
Map<InternalDistributedMember, PersistentMemberID> persistMap =
- getSubMapForLiveMembers(pr, membersToPersistOfflineEqual, bucketMaps[br.getId()]);
+ getSubMapForLiveMembers(membersToPersistOfflineEqual, bucketMaps[br.getId()]);
if (persistMap != null) {
br.getPersistenceAdvisor().persistMembersOfflineAndEqual(persistMap);
if (logger.isDebugEnabled()) {
@@ -1922,7 +1866,7 @@ public class GemFireCacheImpl
}
}
- // send persited profile update to other members, let all members to persist
+ // send persisted profile update to other members, let all members to persist
// before close the region
pr.setShutDownAllStatus(PartitionedRegion.OFFLINE_EQUAL_PERSISTED);
new UpdateAttributesProcessor(pr).distribute(false);
@@ -1930,7 +1874,7 @@ public class GemFireCacheImpl
if (logger.isDebugEnabled()) {
logger.debug("shutDownAll: PR {}: all offline_equal profiles received.", pr.getName());
}
- } // datastore
+ } // dataStore
// after done all steps for buckets, close pr
// close accessor directly
@@ -1952,7 +1896,6 @@ public class GemFireCacheImpl
.toLocalizedString(),
e);
}
- // pr.close();
} // synchronized
} catch (CacheClosedException cce) {
logger.debug("Encounter CacheClosedException when shutDownAll is closing PR: {}:{}",
@@ -1971,17 +1914,15 @@ public class GemFireCacheImpl
}
private Map<InternalDistributedMember, PersistentMemberID> getSubMapForLiveMembers(
- PartitionedRegion pr, Set<InternalDistributedMember> membersToPersistOfflineEqual,
+ Set<InternalDistributedMember> membersToPersistOfflineEqual,
Map<InternalDistributedMember, PersistentMemberID> bucketMap) {
if (bucketMap == null) {
return null;
}
- Map<InternalDistributedMember, PersistentMemberID> persistMap = new HashMap();
- Iterator itor = membersToPersistOfflineEqual.iterator();
- while (itor.hasNext()) {
- InternalDistributedMember idm = (InternalDistributedMember) itor.next();
- if (bucketMap.containsKey(idm)) {
- persistMap.put(idm, bucketMap.get(idm));
+ Map<InternalDistributedMember, PersistentMemberID> persistMap = new HashMap<>();
+ for (InternalDistributedMember member : membersToPersistOfflineEqual) {
+ if (bucketMap.containsKey(member)) {
+ persistMap.put(member, bucketMap.get(member));
}
}
return persistMap;
@@ -2154,10 +2095,10 @@ public class GemFireCacheImpl
return;
}
- /**
+ /*
* First close the ManagementService as it uses a lot of infra which will be closed by
* cache.close()
- **/
+ */
system.handleResourceEvent(ResourceEvent.CACHE_REMOVE, this);
if (this.listener != null) {
this.system.removeResourceListener(listener);
@@ -2209,11 +2150,10 @@ public class GemFireCacheImpl
// ignore
}
- GatewaySenderAdvisor advisor = null;
for (GatewaySender sender : this.getAllGatewaySenders()) {
try {
sender.stop();
- advisor = ((AbstractGatewaySender) sender).getSenderAdvisor();
+ GatewaySenderAdvisor advisor = ((AbstractGatewaySender) sender).getSenderAdvisor();
if (advisor != null) {
if (isDebugEnabled) {
logger.debug("Stopping the GatewaySender advisor");
@@ -2265,9 +2205,9 @@ public class GemFireCacheImpl
GemFireCacheImpl.pdxInstance = null;
}
- List rootRegionValues = null;
+ List<LocalRegion> rootRegionValues;
synchronized (this.rootRegions) {
- rootRegionValues = new ArrayList(this.rootRegions.values());
+ rootRegionValues = new ArrayList<>(this.rootRegions.values());
}
{
final Operation op;
@@ -2281,27 +2221,26 @@ public class GemFireCacheImpl
LocalRegion prRoot = null;
- for (Iterator itr = rootRegionValues.iterator(); itr.hasNext();) {
- LocalRegion lr = (LocalRegion) itr.next();
+ for (LocalRegion region : rootRegionValues) {
if (isDebugEnabled) {
- logger.debug("{}: processing region {}", this, lr.getFullPath());
+ logger.debug("{}: processing region {}", this, region.getFullPath());
}
- if (PartitionedRegionHelper.PR_ROOT_REGION_NAME.equals(lr.getName())) {
- prRoot = lr;
+ if (PartitionedRegionHelper.PR_ROOT_REGION_NAME.equals(region.getName())) {
+ prRoot = region;
} else {
- if (lr.getName().contains(ParallelGatewaySenderQueue.QSTRING)) {
+ if (region.getName().contains(ParallelGatewaySenderQueue.QSTRING)) {
continue; // this region will be closed internally by parent region
}
if (isDebugEnabled) {
- logger.debug("{}: closing region {}...", this, lr.getFullPath());
+ logger.debug("{}: closing region {}...", this, region.getFullPath());
}
try {
- lr.handleCacheClose(op);
+ region.handleCacheClose(op);
} catch (Exception e) {
if (isDebugEnabled || !forcedDisconnect) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.GemFireCache_0_ERROR_CLOSING_REGION_1,
- new Object[] {this, lr.getFullPath()}), e);
+ new Object[] {this, region.getFullPath()}), e);
}
}
}
@@ -2470,8 +2409,7 @@ public class GemFireCacheImpl
// do this late to prevent 43412
TypeRegistry.setPdxSerializer(null);
- for (Iterator iter = cacheLifecycleListeners.iterator(); iter.hasNext();) {
- CacheLifecycleListener listener = (CacheLifecycleListener) iter.next();
+ for (CacheLifecycleListener listener : cacheLifecycleListeners) {
listener.cacheClosed(this);
}
// Fix for #49856
@@ -2479,16 +2417,13 @@ public class GemFireCacheImpl
SystemFailure.signalCacheClose();
} // static synchronization on GemFireCache.class
-
}
- // see Cache.isReconnecting()
@Override
public boolean isReconnecting() {
return this.system.isReconnecting();
}
- // see Cache.waitUntilReconnected(long, TimeUnit)
@Override
public boolean waitUntilReconnected(long time, TimeUnit units) throws InterruptedException {
boolean systemReconnected = this.system.waitUntilReconnected(time, units);
@@ -2496,10 +2431,7 @@ public class GemFireCacheImpl
return false;
}
GemFireCacheImpl cache = getInstance();
- if (cache == null || !cache.isInitialized()) {
- return false;
- }
- return true;
+ return cache != null && cache.isInitialized;
}
@Override
@@ -2566,43 +2498,27 @@ public class GemFireCacheImpl
*/
private ThreadPoolExecutor diskStoreTaskPool = null;
- private void createDiskStoreTaskPool() {
- int MAXT = DiskStoreImpl.MAX_CONCURRENT_COMPACTIONS;
- final ThreadGroup compactThreadGroup =
- LoggingThreadGroup.createThreadGroup("Oplog Compactor Thread Group", logger);
- /*
- * final ThreadFactory compactThreadFactory = new ThreadFactory() { public Thread
- * newThread(Runnable command) { Thread thread = new Thread(compactThreadGroup, command,
- * "Idle OplogCompactor"); thread.setDaemon(true); return thread; } };
- */
-
- final ThreadFactory compactThreadFactory =
- GemfireCacheHelper.CreateThreadFactory(compactThreadGroup, "Idle OplogCompactor");
- this.diskStoreTaskPool = new ThreadPoolExecutor(MAXT, MAXT, 1, TimeUnit.SECONDS,
- new LinkedBlockingQueue(), compactThreadFactory);
- }
+ private final ConcurrentMap<String, DiskStoreImpl> diskStores = new ConcurrentHashMap<>();
- private final ConcurrentMap<String, DiskStoreImpl> diskStores =
- new ConcurrentHashMap<String, DiskStoreImpl>();
private final ConcurrentMap<String, DiskStoreImpl> regionOwnedDiskStores =
- new ConcurrentHashMap<String, DiskStoreImpl>();
+ new ConcurrentHashMap<>();
- public void addDiskStore(DiskStoreImpl dsi) {
+ void addDiskStore(DiskStoreImpl dsi) {
this.diskStores.put(dsi.getName(), dsi);
if (!dsi.isOffline()) {
getDiskStoreMonitor().addDiskStore(dsi);
}
}
- public void removeDiskStore(DiskStoreImpl dsi) {
+ void removeDiskStore(DiskStoreImpl dsi) {
this.diskStores.remove(dsi.getName());
this.regionOwnedDiskStores.remove(dsi.getName());
- /** Added for M&M **/
+ // Added for M&M
if (!dsi.getOwnedByRegion())
system.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, dsi);
}
- public void addRegionOwnedDiskStore(DiskStoreImpl dsi) {
+ void addRegionOwnedDiskStore(DiskStoreImpl dsi) {
this.regionOwnedDiskStores.put(dsi.getName(), dsi);
if (!dsi.isOffline()) {
getDiskStoreMonitor().addDiskStore(dsi);
@@ -2618,7 +2534,7 @@ public class GemFireCacheImpl
logger.debug("closing {}", dsi);
}
dsi.close();
- /** Added for M&M **/
+ // Added for M&M
system.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, dsi);
} catch (Exception e) {
logger.fatal(
@@ -2635,13 +2551,6 @@ public class GemFireCacheImpl
DEFAULT_DS_NAME = dsName;
}
- /**
- * Used by unit tests to undo a change to the default disk store name.
- */
- public static void unsetDefaultDiskStoreName() {
- DEFAULT_DS_NAME = DiskStoreFactory.DEFAULT_DISK_STORE_NAME;
- }
-
public static String getDefaultDiskStoreName() {
return DEFAULT_DS_NAME;
}
@@ -2687,138 +2596,48 @@ public class GemFireCacheImpl
@Override
public Collection<DiskStoreImpl> listDiskStoresIncludingRegionOwned() {
- HashSet<DiskStoreImpl> allDiskStores = new HashSet<DiskStoreImpl>();
+ HashSet<DiskStoreImpl> allDiskStores = new HashSet<>();
allDiskStores.addAll(this.diskStores.values());
allDiskStores.addAll(this.regionOwnedDiskStores.values());
return allDiskStores;
}
- public boolean executeDiskStoreTask(DiskStoreTask r) {
- synchronized (this.diskStoreTaskSync) {
- if (!this.diskStoreTaskSync.get()) {
- if (this.diskStoreTaskPool == null) {
- createDiskStoreTaskPool();
- }
- try {
- this.diskStoreTaskPool.execute(r);
- return true;
- } catch (RejectedExecutionException ex) {
- if (logger.isDebugEnabled()) {
- logger.debug("Ignored compact schedule during shutdown", ex);
- }
- }
- }
- }
- return false;
- }
-
private void stopDiskStoreTaskPool() {
synchronized (this.diskStoreTaskSync) {
this.diskStoreTaskSync.set(true);
// All the regions have already been closed
// so this pool shouldn't be doing anything.
if (this.diskStoreTaskPool != null) {
- List<Runnable> l = this.diskStoreTaskPool.shutdownNow();
- for (Runnable runnable : l) {
- if (l instanceof DiskStoreTask) {
- ((DiskStoreTask) l).taskCancelled();
- }
- }
- }
- // this.diskStoreTaskPool = null;
- }
- }
-
- public int stopGatewaySenders(boolean byShutdownAll) {
- final boolean isDebugEnabled = logger.isDebugEnabled();
-
- int cnt = 0;
- closingGatewaySendersByShutdownAll = byShutdownAll;
- synchronized (allGatewaySendersLock) {
- GatewaySenderAdvisor advisor = null;
- Iterator<GatewaySender> itr = allGatewaySenders.iterator();
- while (itr.hasNext()) {
- GatewaySender sender = itr.next();
- if (isDebugEnabled) {
- logger.debug("{}: stopping gateway sender {}", this, sender);
- }
- try {
- sender.stop();
- advisor = ((AbstractGatewaySender) sender).getSenderAdvisor();
- if (advisor != null) {
- if (isDebugEnabled) {
- logger.debug("Stopping the GatewaySender advisor");
- }
- advisor.close();
- }
- cnt++;
- } catch (CancelException e) {
- if (isDebugEnabled) {
- logger.debug("Ignored cache closure while closing sender {}", sender, e);
- }
- }
- }
- } // synchronized
-
- destroyGatewaySenderLockService();
-
- if (isDebugEnabled) {
- logger.debug("{}: finished stopping {} gateway sender(s), total is {}", this, cnt,
- allGatewaySenders.size());
- }
- return cnt;
- }
-
- public int stopGatewayReceivers(boolean byShutdownAll) {
- int cnt = 0;
- closingGatewayReceiversByShutdownAll = byShutdownAll;
- synchronized (allGatewayReceiversLock) {
- Iterator<GatewayReceiver> itr = allGatewayReceivers.iterator();
- while (itr.hasNext()) {
- GatewayReceiver receiver = itr.next();
- if (logger.isDebugEnabled()) {
- logger.debug("{}: stopping gateway receiver {}", this, receiver);
- }
- try {
- receiver.stop();
- cnt++;
- } catch (CancelException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("Ignored cache closure while closing receiver {}", receiver, e);
+ List<Runnable> listOfRunnables = this.diskStoreTaskPool.shutdownNow();
+ for (Runnable runnable : listOfRunnables) {
+ // TODO: fix this
+ if (listOfRunnables instanceof DiskStoreTask) {
+ ((DiskStoreTask) listOfRunnables).taskCancelled();
}
}
}
- } // synchronized
-
- if (logger.isDebugEnabled()) {
- logger.debug("{}: finished stopping {} gateway receiver(s), total is {}", this, cnt,
- allGatewayReceivers.size());
}
- return cnt;
}
- void stopServers() {
-
+ private void stopServers() {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("{}: stopping cache servers...", this);
}
boolean stoppedCacheServer = false;
- Iterator allCacheServersIterator = this.allCacheServers.iterator();
- while (allCacheServersIterator.hasNext()) {
- CacheServerImpl bridge = (CacheServerImpl) allCacheServersIterator.next();
+ for (CacheServerImpl cacheServer : this.allCacheServers) {
if (isDebugEnabled) {
- logger.debug("stopping bridge {}", bridge);
+ logger.debug("stopping bridge {}", cacheServer);
}
try {
- bridge.stop();
+ cacheServer.stop();
} catch (CancelException e) {
if (isDebugEnabled) {
- logger.debug("Ignored cache closure while closing bridge {}", bridge, e);
+ logger.debug("Ignored cache closure while closing bridge {}", cacheServer, e);
}
}
- allCacheServers.remove(bridge);
+ allCacheServers.remove(cacheServer);
stoppedCacheServer = true;
}
if (stoppedCacheServer) {
@@ -2854,7 +2673,6 @@ public class GemFireCacheImpl
// If a durable client stops/starts its cache, it needs
// to maintain the same unique id.
ClientProxyMembershipID.resetUniqueIdCounter();
-
}
@Override
@@ -2896,26 +2714,26 @@ public class GemFireCacheImpl
PartitionedRegion p = (PartitionedRegion) r;
return p.getRegionAdvisor().adviseAllPRNodes();
} else {
- return Collections.EMPTY_SET;
+ return Collections.emptySet();
}
}
@Override
public Set<InetSocketAddress> getCurrentServers() {
Map<String, Pool> pools = PoolManager.getAll();
- Set result = null;
+ Set<InetSocketAddress> result = null;
for (Pool p : pools.values()) {
PoolImpl pi = (PoolImpl) p;
for (Object o : pi.getCurrentServers()) {
ServerLocation sl = (ServerLocation) o;
if (result == null) {
- result = new HashSet<DistributedMember>();
+ result = new HashSet<>();
}
result.add(new InetSocketAddress(sl.getHostName(), sl.getPort()));
}
}
if (result == null) {
- return Collections.EMPTY_SET;
+ return Collections.emptySet();
} else {
return result;
}
@@ -2954,7 +2772,7 @@ public class GemFireCacheImpl
*
* @return the sweeper task
*/
- protected EventTracker.ExpiryTask getEventTrackerTask() {
+ EventTracker.ExpiryTask getEventTrackerTask() {
return this.recordedEventSweeper;
}
@@ -2975,7 +2793,7 @@ public class GemFireCacheImpl
* @return List of all instances of properties found for the given declarable
*/
public List<Properties> getDeclarableProperties(final String className) {
- List<Properties> propertiesList = new ArrayList<Properties>();
+ List<Properties> propertiesList = new ArrayList<>();
synchronized (this.declarablePropertiesMap) {
for (Map.Entry<Declarable, Properties> entry : this.declarablePropertiesMap.entrySet()) {
if (entry.getKey().getClass().getName().equals(className)) {
@@ -2997,15 +2815,6 @@ public class GemFireCacheImpl
}
/**
- * Returns the date and time that this cache was created.
- *
- * @since GemFire 3.5
- */
- public Date getCreationDate() {
- return this.creationDate;
- }
-
- /**
* Returns the number of seconds that have elapsed since the Cache was created.
*
* @since GemFire 3.5
@@ -3031,7 +2840,7 @@ public class GemFireCacheImpl
}
@Override
- public Region createVMRegion(String name, RegionAttributes attrs)
+ public <K, V> Region<K, V> createVMRegion(String name, RegionAttributes<K, V> attrs)
throws RegionExistsException, TimeoutException {
return createRegion(name, attrs);
}
@@ -3056,33 +2865,31 @@ public class GemFireCacheImpl
}
Pool pool = null;
// create the pool if it does not already exist
- if (this.clientpf == null) {
+ if (this.clientPoolFactory == null) {
Map<String, Pool> pools = PoolManager.getAll();
if (pools.isEmpty()) {
- this.clientpf = createDefaultPF();
+ this.clientPoolFactory = createDefaultPF();
} else if (pools.size() == 1) {
// otherwise use a singleton.
pool = pools.values().iterator().next();
} else {
- if (pool == null) {
- // act as if the default pool was configured
- // and see if we can find an existing one that is compatible
- PoolFactoryImpl pfi = (PoolFactoryImpl) createDefaultPF();
- for (Pool p : pools.values()) {
- if (((PoolImpl) p).isCompatible(pfi.getPoolAttributes())) {
- pool = p;
- break;
- }
- }
- if (pool == null) {
- // if pool is still null then we will not have a default pool for this ClientCache
- setDefaultPool(null);
- return;
+ // act as if the default pool was configured
+ // and see if we can find an existing one that is compatible
+ PoolFactoryImpl pfi = (PoolFactoryImpl) createDefaultPF();
+ for (Pool p : pools.values()) {
+ if (((PoolImpl) p).isCompatible(pfi.getPoolAttributes())) {
+ pool = p;
+ break;
}
}
+ if (pool == null) {
+ // if pool is still null then we will not have a default pool for this ClientCache
+ setDefaultPool(null);
+ return;
+ }
}
} else {
- PoolFactoryImpl pfi = (PoolFactoryImpl) this.clientpf;
+ PoolFactoryImpl pfi = (PoolFactoryImpl) this.clientPoolFactory;
if (pfi.getPoolAttributes().locators.isEmpty() && pfi.getPoolAttributes().servers.isEmpty()) {
try {
String localHostName = SocketCreator.getHostName(SocketCreator.getLocalHost());
@@ -3111,7 +2918,7 @@ public class GemFireCacheImpl
poolName = "DEFAULT" + count;
count++;
}
- pool = this.clientpf.create(poolName);
+ pool = this.clientPoolFactory.create(poolName);
}
setDefaultPool(pool);
}
@@ -3122,7 +2929,7 @@ public class GemFireCacheImpl
* @return the default pool that is right for us
*/
public Pool determineDefaultPool(PoolFactory pf) {
- Pool pool = null;
+ Pool pool;
// create the pool if it does not already exist
if (pf == null) {
Map<String, Pool> pools = PoolManager.getAll();
@@ -3175,7 +2982,7 @@ public class GemFireCacheImpl
}
@Override
- public Region createRegion(String name, RegionAttributes attrs)
+ public <K, V> Region<K, V> createRegion(String name, RegionAttributes<K, V> attrs)
throws RegionExistsException, TimeoutException {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
@@ -3183,7 +2990,7 @@ public class GemFireCacheImpl
return basicCreateRegion(name, attrs);
}
- public Region basicCreateRegion(String name, RegionAttributes attrs)
+ public <K, V> Region<K, V> basicCreateRegion(String name, RegionAttributes<K, V> attrs)
throws RegionExistsException, TimeoutException {
try {
InternalRegionArguments ira = new InternalRegionArguments().setDestroyLockFlag(true)
@@ -3221,8 +3028,8 @@ public class GemFireCacheImpl
LocalizedStrings.GemFireCache_ATTRIBUTES_MUST_NOT_BE_NULL.toLocalizedString());
}
- LocalRegion rgn = null;
- // final boolean getDestroyLock = attrs.getDestroyLockFlag();
+ LocalRegion region;
+
final InputStream snapshotInputStream = internalRegionArgs.getSnapshotInputStream();
InternalDistributedMember imageTarget = internalRegionArgs.getImageTarget();
final boolean recreate = internalRegionArgs.getRecreateFlag();
@@ -3230,17 +3037,15 @@ public class GemFireCacheImpl
final boolean isPartitionedRegion = attrs.getPartitionAttributes() != null;
final boolean isReinitCreate = snapshotInputStream != null || imageTarget != null || recreate;
- final String regionPath = LocalRegion.calcFullPath(name, null);
-
try {
for (;;) {
getCancelCriterion().checkCancelInProgress(null);
Future future = null;
synchronized (this.rootRegions) {
- rgn = (LocalRegion) this.rootRegions.get(name);
- if (rgn != null) {
- throw new RegionExistsException(rgn);
+ region = this.rootRegions.get(name);
+ if (region != null) {
+ throw new RegionExistsException(region);
}
// check for case where a root region is being reinitialized and we
// didn't
@@ -3252,27 +3057,21 @@ public class GemFireCacheImpl
}
if (future == null) {
if (internalRegionArgs.getInternalMetaRegion() != null) {
- rgn = internalRegionArgs.getInternalMetaRegion();
+ region = internalRegionArgs.getInternalMetaRegion();
} else if (isPartitionedRegion) {
- rgn = new PartitionedRegion(name, attrs, null, this, internalRegionArgs);
+ region = new PartitionedRegion(name, attrs, null, this, internalRegionArgs);
} else {
- /*
- * for (String senderId : attrs.getGatewaySenderIds()) { if
- * (getGatewaySender(senderId) != null && getGatewaySender(senderId).isParallel()) {
- * throw new IllegalStateException( LocalizedStrings.
- * AttributesFactory_PARALLELGATEWAYSENDER_0_IS_INCOMPATIBLE_WITH_DISTRIBUTED_REPLICATION
- * .toLocalizedString(senderId)); } }
- */
+
if (attrs.getScope().isLocal()) {
- rgn = new LocalRegion(name, attrs, null, this, internalRegionArgs);
+ region = new LocalRegion(name, attrs, null, this, internalRegionArgs);
} else {
- rgn = new DistributedRegion(name, attrs, null, this, internalRegionArgs);
+ region = new DistributedRegion(name, attrs, null, this, internalRegionArgs);
}
}
- this.rootRegions.put(name, rgn);
+ this.rootRegions.put(name, region);
if (isReinitCreate) {
- regionReinitialized(rgn);
+ regionReinitialized(region);
}
break;
}
@@ -3280,8 +3079,8 @@ public class GemFireCacheImpl
boolean interrupted = Thread.interrupted();
try { // future != null
- LocalRegion region = (LocalRegion) future.get(); // wait on Future
- throw new RegionExistsException(region);
+ LocalRegion regionThatExists = (LocalRegion) future.get(); // wait on Future
+ throw new RegionExistsException(regionThatExists);
} catch (InterruptedException e) {
interrupted = true;
} catch (ExecutionException e) {
@@ -3297,18 +3096,14 @@ public class GemFireCacheImpl
boolean success = false;
try {
- setRegionByPath(rgn.getFullPath(), rgn);
- rgn.initialize(snapshotInputStream, imageTarget, internalRegionArgs);
+ setRegionByPath(region.getFullPath(), region);
+ region.initialize(snapshotInputStream, imageTarget, internalRegionArgs);
success = true;
- } catch (CancelException e) {
- // don't print a call stack
- throw e;
- } catch (RedundancyAlreadyMetException e) {
- // don't log this
+ } catch (CancelException | RedundancyAlreadyMetException e) {
throw e;
} catch (final RuntimeException validationException) {
logger.warn(LocalizedMessage.create(
- LocalizedStrings.GemFireCache_INITIALIZATION_FAILED_FOR_REGION_0, rgn.getFullPath()),
+ LocalizedStrings.GemFireCache_INITIALIZATION_FAILED_FOR_REGION_0, region.getFullPath()),
validationException);
throw validationException;
} finally {
@@ -3316,7 +3111,7 @@ public class GemFireCacheImpl
try {
// do this before removing the region from
// the root set to fix bug 41982.
- rgn.cleanupFailedInitialization();
+ region.cleanupFailedInitialization();
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
@@ -3326,15 +3121,15 @@ public class GemFireCacheImpl
// bug #44672 - log the failure but don't override the original exception
logger.warn(LocalizedMessage.create(
- LocalizedStrings.GemFireCache_INIT_CLEANUP_FAILED_FOR_REGION_0, rgn.getFullPath()),
- t);
+ LocalizedStrings.GemFireCache_INIT_CLEANUP_FAILED_FOR_REGION_0,
+ region.getFullPath()), t);
} finally {
// clean up if initialize fails for any reason
- setRegionByPath(rgn.getFullPath(), null);
+ setRegionByPath(region.getFullPath(), null);
synchronized (this.rootRegions) {
- Region r = (Region) this.rootRegions.get(name);
- if (r == rgn) {
+ Region rootRegion = this.rootRegions.get(name);
+ if (rootRegion == region) {
this.rootRegions.remove(name);
}
} // synchronized
@@ -3342,9 +3137,7 @@ public class GemFireCacheImpl
} // success
}
-
-
- rgn.postCreateRegion();
+ region.postCreateRegion();
} catch (RegionExistsException ex) {
// outside of sync make sure region is initialized to fix bug 37563
LocalRegion r = (LocalRegion) ex.getRegion();
@@ -3352,15 +3145,14 @@ public class GemFireCacheImpl
throw ex;
}
- invokeRegionAfter(rgn);
- /**
- * Added for M&M . Putting the callback here to avoid creating RegionMBean in case of Exception
- **/
- if (!rgn.isInternalRegion()) {
- system.handleResourceEvent(ResourceEvent.REGION_CREATE, rgn);
+ invokeRegionAfter(region);
+
+ // Added for M&M . Putting the callback here to avoid creating RegionMBean in case of Exception
+ if (!region.isInternalRegion()) {
+ system.handleResourceEvent(ResourceEvent.REGION_CREATE, region);
}
- return rgn;
+ return region;
}
@Override
@@ -3380,7 +3172,7 @@ public class GemFireCacheImpl
}
@Override
- public final Region getRegion(String path) {
+ public <K, V> Region<K, V> getRegion(String path) {
return getRegion(path, false);
}
@@ -3390,7 +3182,7 @@ public class GemFireCacheImpl
* @since GemFire 6.0
*/
public Set<LocalRegion> getAllRegions() {
- Set<LocalRegion> result = new HashSet();
+ Set<LocalRegion> result = new HashSet<>();
synchronized (this.rootRegions) {
for (Object r : this.rootRegions.values()) {
if (r instanceof PartitionedRegion) {
@@ -3413,7 +3205,7 @@ public class GemFireCacheImpl
}
public Set<LocalRegion> getApplicationRegions() {
- Set<LocalRegion> result = new HashSet<LocalRegion>();
+ Set<LocalRegion> result = new HashSet<>();
synchronized (this.rootRegions) {
for (Object r : this.rootRegions.values()) {
LocalRegion rgn = (LocalRegion) r;
@@ -3480,7 +3272,7 @@ public class GemFireCacheImpl
String[] pathParts = parsePath(path);
LocalRegion root;
synchronized (this.rootRegions) {
- root = (LocalRegion) this.rootRegions.get(pathParts[0]);
+ root = this.rootRegions.get(pathParts[0]);
if (root == null)
return null;
}
@@ -3500,12 +3292,11 @@ public class GemFireCacheImpl
* @param returnDestroyedRegion if true, okay to return a destroyed region
*/
@Override
- public Region getRegion(String path, boolean returnDestroyedRegion) {
+ public <K, V> Region<K, V> getRegion(String path, boolean returnDestroyedRegion) {
stopper.checkCancelInProgress(null);
{
LocalRegion result = getRegionByPath(path);
// Do not waitOnInitialization() for PR
- // if (result != null && !(result instanceof PartitionedRegion)) {
if (result != null) {
result.waitOnInitialization();
if (!returnDestroyedRegion && result.isDestroyed()) {
@@ -3520,7 +3311,7 @@ public class GemFireCacheImpl
String[] pathParts = parsePath(path);
LocalRegion root;
synchronized (this.rootRegions) {
- root = (LocalRegion) this.rootRegions.get(pathParts[0]);
+ root = this.rootRegions.get(pathParts[0]);
if (root == null) {
if (logger.isDebugEnabled()) {
logger.debug("GemFireCache.getRegion, no region found for {}", pathParts[0]);
@@ -3561,7 +3352,7 @@ public class GemFireCacheImpl
LocalRegion root;
LogWriterI18n logger = getLoggerI18n();
synchronized (this.rootRegions) {
- root = (LocalRegion) this.rootRegions.get(pathParts[0]);
+ root = this.rootRegions.get(pathParts[0]);
if (root == null) {
if (logger.fineEnabled()) {
logger.fine("GemFireCache.getRegion, no region found for " + pathParts[0]);
@@ -3600,7 +3391,7 @@ public class GemFireCacheImpl
}
/** Return true if this region is initializing */
- boolean isGlobalRegionInitializing(LocalRegion region) {
+ private boolean isGlobalRegionInitializing(LocalRegion region) {
boolean result = region != null && region.scope.isGlobal() && !region.isInitialized();
if (result) {
if (logger.isDebugEnabled()) {
@@ -3611,33 +3402,32 @@ public class GemFireCacheImpl
}
@Override
- public Set rootRegions() {
+ public Set<Region<?, ?>> rootRegions() {
return rootRegions(false);
}
- public final Set rootRegions(boolean includePRAdminRegions) {
+ public Set<Region<?, ?>> rootRegions(boolean includePRAdminRegions) {
return rootRegions(includePRAdminRegions, true);
}
- private final Set rootRegions(boolean includePRAdminRegions, boolean waitForInit) {
+ private Set<Region<?, ?>> rootRegions(boolean includePRAdminRegions, boolean waitForInit) {
stopper.checkCancelInProgress(null);
- Set regions = new HashSet();
+ Set<Region<?, ?>> regions = new HashSet<>();
synchronized (this.rootRegions) {
- for (Iterator itr = this.rootRegions.values().iterator(); itr.hasNext();) {
- LocalRegion r = (LocalRegion) itr.next();
+ for (LocalRegion region : this.rootRegions.values()) {
// If this is an internal meta-region, don't return it to end user
- if (r.isSecret() || r.isUsedForMetaRegion() || r instanceof HARegion
- || !includePRAdminRegions
- && (r.isUsedForPartitionedRegionAdmin() || r.isUsedForPartitionedRegionBucket())) {
- continue; // Skip administrative PartitionedRegions
+ if (region.isSecret() || region.isUsedForMetaRegion() || region instanceof HARegion
+ || !includePRAdminRegions && (region.isUsedForPartitionedRegionAdmin()
+ || region.isUsedForPartitionedRegionBucket())) {
+ // Skip administrative PartitionedRegions
+ continue;
}
- regions.add(r);
+ regions.add(region);
}
}
if (waitForInit) {
for (Iterator r = regions.iterator(); r.hasNext();) {
LocalRegion lr = (LocalRegion) r.next();
- // lr.waitOnInitialization();
if (!lr.checkForInitialization()) {
r.remove();
}
@@ -3654,12 +3444,12 @@ public class GemFireCacheImpl
@Override
public void cleanupForClient(CacheClientNotifier ccn, ClientProxyMembershipID client) {
try {
- if (isClosed())
+ if (isClosed()) {
return;
- Iterator it = rootRegions(false, false).iterator();
- while (it.hasNext()) {
- LocalRegion lr = (LocalRegion) it.next();
- lr.cleanupForClient(ccn, client);
+ }
+ Set<Region<?, ?>> rootRegions = rootRegions(false, false);
+ for (Region region : rootRegions) {
+ ((LocalRegion) region).cleanupForClient(ccn, client);
}
} catch (DistributedSystemDisconnectedException ignore) {
}
@@ -3849,9 +3639,9 @@ public class GemFireCacheImpl
public boolean removeRoot(LocalRegion rootRgn) {
synchronized (this.rootRegions) {
String rgnName = rootRgn.getName();
- LocalRegion found = (LocalRegion) this.rootRegions.get(rgnName);
+ LocalRegion found = this.rootRegions.get(rgnName);
if (found == rootRgn) {
- LocalRegion previous = (LocalRegion) this.rootRegions.remove(rgnName);
+ LocalRegion previous = this.rootRegions.remove(rgnName);
Assert.assertTrue(previous == rootRgn);
return true;
} else
@@ -3902,14 +3692,17 @@ public class GemFireCacheImpl
}
}
+ @Override
public void addRegionListener(RegionListener l) {
this.regionListeners.add(l);
}
+ @Override
public void removeRegionListener(RegionListener l) {
this.regionListeners.remove(l);
}
+ @Override
public Set<RegionListener> getRegionListeners() {
return Collections.unmodifiableSet(this.regionListeners);
}
@@ -3934,8 +3727,7 @@ public class GemFireCacheImpl
}
/**
- * @see CacheClientProxy
- * @guarded.By {@link #ccpTimerMutex}
+ * @see CacheClientProxy GuardedBy {@link #ccpTimerMutex}
*/
private SystemTimer ccpTimer;
@@ -4049,7 +3841,7 @@ public class GemFireCacheImpl
synchronized (allGatewaySendersLock) {
if (!allGatewaySenders.contains(sender)) {
new UpdateAttributesProcessor((AbstractGatewaySender) sender).distribute(true);
- Set<GatewaySender> tmp = new HashSet<GatewaySender>(allGatewaySenders.size() + 1);
+ Set<GatewaySender> tmp = new HashSet<>(allGatewaySenders.size() + 1);
if (!allGatewaySenders.isEmpty()) {
tmp.addAll(allGatewaySenders);
}
@@ -4097,7 +3889,7 @@ public class GemFireCacheImpl
synchronized (allGatewaySendersLock) {
if (allGatewaySenders.contains(sender)) {
new UpdateAttributesProcessor((AbstractGatewaySender) sender, true).distribute(true);
- Set<GatewaySender> tmp = new HashSet<GatewaySender>(allGatewaySenders.size() - 1);
+ Set<GatewaySender> tmp = new HashSet<>(allGatewaySenders.size() - 1);
if (!allGatewaySenders.isEmpty()) {
tmp.addAll(allGatewaySenders);
}
@@ -4113,7 +3905,7 @@ public class GemFireCacheImpl
}
stopper.checkCancelInProgress(null);
synchronized (allGatewayReceiversLock) {
- Set<GatewayReceiver> tmp = new HashSet<GatewayReceiver>(allGatewayReceivers.size() + 1);
+ Set<GatewayReceiver> tmp = new HashSet<>(allGatewayReceivers.size() + 1);
if (!allGatewayReceivers.isEmpty()) {
tmp.addAll(allGatewayReceivers);
}
@@ -4137,7 +3929,7 @@ public class GemFireCacheImpl
*/
@Override
public Set<GatewaySender> getGatewaySenders() {
- Set<GatewaySender> tempSet = new HashSet<GatewaySender>();
+ Set<GatewaySender> tempSet = new HashSet<>();
for (GatewaySender sender : allGatewaySenders) {
if (!((AbstractGatewaySender) sender).isForInternalUse()) {
tempSet.add(sender);
@@ -4220,15 +4012,13 @@ public class GemFireCacheImpl
@Override
public List<CacheServer> getCacheServers() {
- List cacheServersWithoutReceiver = null;
+ List<CacheServer> cacheServersWithoutReceiver = null;
if (!allCacheServers.isEmpty()) {
- Iterator allCacheServersIterator = allCacheServers.iterator();
- while (allCacheServersIterator.hasNext()) {
- CacheServerImpl cacheServer = (CacheServerImpl) allCacheServersIterator.next();
+ for (CacheServerImpl cacheServer : allCacheServers) {
// If CacheServer is a GatewayReceiver, don't return as part of CacheServers
if (!cacheServer.isGatewayReceiver()) {
if (cacheServersWithoutReceiver == null) {
- cacheServersWithoutReceiver = new ArrayList();
+ cacheServersWithoutReceiver = new ArrayList<>();
}
cacheServersWithoutReceiver.add(cacheServer);
}
@@ -4245,21 +4035,10 @@ public class GemFireCacheImpl
}
/**
- * notify partitioned regions that this cache requires all of their events
- */
- public void requiresPREvents() {
- synchronized (this.partitionedRegions) {
- for (Iterator it = this.partitionedRegions.iterator(); it.hasNext();) {
- ((PartitionedRegion) it.next()).cacheRequiresNotification();
- }
- }
- }
-
- /**
* add a partitioned region to the set of tracked partitioned regions. This is used to notify the
* regions when this cache requires, or does not require notification of all region/entry events.
*/
- public void addPartitionedRegion(PartitionedRegion r) {
+ void addPartitionedRegion(PartitionedRegion r) {
synchronized (this.partitionedRegions) {
if (r.isDestroyed()) {
if (logger.isDebugEnabled()) {
@@ -4276,15 +4055,16 @@ public class GemFireCacheImpl
/**
* Returns a set of all current partitioned regions for test hook.
*/
+ @Override
public Set<PartitionedRegion> getPartitionedRegions() {
synchronized (this.partitionedRegions) {
- return new HashSet<PartitionedRegion>(this.partitionedRegions);
+ return new HashSet<>(this.partitionedRegions);
}
}
private TreeMap<String, Map<String, PartitionedRegion>> getPRTrees() {
// prTree will save a sublist of PRs who are under the same root
- TreeMap<String, Map<String, PartitionedRegion>> prTrees = new TreeMap();
+ TreeMap<String, Map<String, PartitionedRegion>> prTrees = new TreeMap<>();
TreeMap<String, PartitionedRegion> prMap = getPartitionedRegionMap();
boolean hasColocatedRegion = false;
for (PartitionedRegion pr : prMap.values()) {
@@ -4304,7 +4084,7 @@ public class GemFireCacheImpl
TreeMap<String, PartitionedRegion> prSubMap =
(TreeMap<String, PartitionedRegion>) prTrees.get(rootName);
if (prSubMap == null) {
- prSubMap = new TreeMap();
+ prSubMap = new TreeMap<>();
prTrees.put(rootName, prSubMap);
}
prSubMap.put(pr.getFullPath(), pr);
@@ -4315,9 +4095,9 @@ public class GemFireCacheImpl
}
private TreeMap<String, PartitionedRegion> getPartitionedRegionMap() {
- TreeMap<String, PartitionedRegion> prMap = new TreeMap();
- for (Map.Entry<String, Region> entry : ((Map<String, Region>) pathToRegion).entrySet()) {
- String regionName = (String) entry.getKey();
+ TreeMap<String, PartitionedRegion> prMap = new TreeMap<>();
+ for (Map.Entry<String, Region> entry : pathToRegion.entrySet()) {
+ String regionName = entry.getKey();
Region region = entry.getValue();
// Don't wait for non partitioned regions
@@ -4342,7 +4122,7 @@ public class GemFireCacheImpl
private LinkedHashMap<String, PartitionedRegion> orderByColocation(
TreeMap<String, PartitionedRegion> prMap) {
- LinkedHashMap<String, PartitionedRegion> orderedPrMap = new LinkedHashMap();
+ LinkedHashMap<String, PartitionedRegion> orderedPrMap = new LinkedHashMap<>();
for (PartitionedRegion pr : prMap.values()) {
addColocatedChildRecursively(orderedPrMap, pr);
}
@@ -4362,17 +4142,14 @@ public class GemFireCacheImpl
* Notification adds to the messaging a PR must do on each put/destroy/invalidate operation and
* should be kept to a minimum
*
- * @param r the partitioned region
+ * @param pr the partitioned region
* @return true if the region should deliver all of its events to this cache
*/
@Override
- public boolean requiresNotificationFromPR(PartitionedRegion r) {
- boolean hasSerialSenders = hasSerialSenders(r);
- boolean result = hasSerialSenders;
+ public boolean requiresNotificationFromPR(PartitionedRegion pr) {
+ boolean result = hasSerialSenders(pr);
if (!result) {
- Iterator allCacheServersIterator = allCacheServers.iterator();
- while (allCacheServersIterator.hasNext()) {
- CacheServerImpl server = (CacheServerImpl) allCacheServersIterator.next();
+ for (CacheServerImpl server : allCacheServers) {
if (!server.getNotifyBySubscription()) {
result = true;
break;
@@ -4401,7 +4178,7 @@ public class GemFireCacheImpl
*
* @see #addPartitionedRegion(PartitionedRegion)
*/
- public void removePartitionedRegion(PartitionedRegion r) {
+ void removePartitionedRegion(PartitionedRegion r) {
synchronized (this.partitionedRegions) {
if (this.partitionedRegions.remove(r)) {
getCachePerfStats().incPartitionedRegions(-1);
@@ -4426,11 +4203,7 @@ public class GemFireCacheImpl
}
stopper.checkCancelInProgress(null);
- if (!this.isServer) {
- return (this.allCacheServers.size() > 0);
- } else {
- return true;
- }
+ return this.isServer || (this.allCacheServers.size() > 0);
}
@Override
@@ -4484,8 +4257,8 @@ public class GemFireCacheImpl
}
@Override
- public RegionAttributes getRegionAttributes(String id) {
- return (RegionAttributes) this.namedRegionAttributes.get(id);
+ public <K, V> RegionAttributes<K, V> getRegionAttributes(String id) {
+ return this.namedRegionAttributes.get(id);
}
@Override
@@ -4498,25 +4271,17 @@ public class GemFireCacheImpl
}
@Override
- public Map listRegionAttributes() {
+ public <K, V> Map<String, RegionAttributes<K, V>> listRegionAttributes() {
return Collections.unmodifiableMap(this.namedRegionAttributes);
}
- private static final ThreadLocal xmlCache = new ThreadLocal();
-
- /**
- * Returns the cache currently being xml initialized by the thread that calls this method. The
- * result will be null if the thread is not initializing a cache.
- */
- public static GemFireCacheImpl getXmlCache() {
- return (GemFireCacheImpl) xmlCache.get();
- }
+ private static final ThreadLocal<GemFireCacheImpl> xmlCache = new ThreadLocal<>();
@Override
public void loadCacheXml(InputStream stream)
throws TimeoutException, CacheWriterException, GatewayException, RegionExistsException {
// make this cache available to callbacks being initialized during xml create
- final Object oldValue = xmlCache.get();
+ final GemFireCacheImpl oldValue = xmlCache.get();
xmlCache.set(this);
try {
CacheXmlParser xml;
@@ -4526,12 +4291,12 @@ public class GemFireCacheImpl
Reader reader = new BufferedReader(new InputStreamReader(stream, "ISO-8859-1"));
Writer stringWriter = new StringWriter();
- int n = -1;
+ int n;
while ((n = reader.read(buffer)) != -1) {
stringWriter.write(buffer, 0, n);
}
- /**
+ /*
* Now replace all replaceable system properties here using <code>PropertyResolver</code>
*/
String replacedXmlString = resolver.processUnresolvableString(stringWriter.toString());
@@ -4614,7 +4379,7 @@ public class GemFireCacheImpl
// TODO make this a simple int guarded by riWaiters and get rid of the double-check
private final AtomicInteger registerInterestsInProgress = new AtomicInteger();
- private final ArrayList<SimpleWaiter> riWaiters = new ArrayList<SimpleWaiter>();
+ private final ArrayList<SimpleWaiter> riWaiters = new ArrayList<>();
private TypeRegistry pdxRegistry; // never changes but is currently only
// initialized in constructor by unit tests
@@ -4641,10 +4406,8 @@ public class GemFireCacheImpl
if (logger.isDebugEnabled()) {
logger.debug("registerInterestCompleted: Signalling end of register-interest");
}
- Iterator it = riWaiters.iterator();
- while (it.hasNext()) {
- SimpleWaiter sw = (SimpleWaiter) it.next();
- sw.doNotify();
+ for (SimpleWaiter simpleWaiter : riWaiters) {
+ simpleWaiter.doNotify();
}
riWaiters.clear();
} // all clear
@@ -4691,57 +4454,6 @@ public class GemFireCacheImpl
}
}
- /**
- * Wait for given sender queue to flush for given timeout.
- *
- * @param id ID of GatewaySender or AsyncEventQueue
- * @param isAsyncListener true if this is for an AsyncEventQueue and false if for a GatewaySender
- * @param maxWaitTime maximum time to wait in seconds; zero or -ve means infinite wait
- *
- * @return zero if maxWaitTime was not breached, -1 if queue could not be found or is closed, and
- * elapsed time if timeout was breached
- */
- public int waitForSenderQueueFlush(String id, boolean isAsyncListener, int maxWaitTime) {
- getCancelCriterion().checkCancelInProgress(null);
- AbstractGatewaySender gatewaySender = null;
- if (isAsyncListener) {
- AsyncEventQueueImpl asyncQueue = (AsyncEventQueueImpl) getAsyncEventQueue(id);
- if (asyncQueue != null) {
- gatewaySender = (AbstractGatewaySender) asyncQueue.getSender();
- }
- } else {
- gatewaySender = (AbstractGatewaySender) getGatewaySender(id);
- }
- RegionQueue rq;
- final long startTime = System.currentTimeMillis();
- long elapsedTime;
- if (maxWaitTime <= 0) {
- maxWaitTime = Integer.MAX_VALUE;
- }
- while (gatewaySender != null && gatewaySender.isRunning()
- && (rq = gatewaySender.getQueue()) != null) {
- if (rq.size() == 0) {
- // return zero since it was not a timeout
- return 0;
- }
- try {
- Thread.sleep(500);
- getCancelCriterion().checkCancelInProgress(null);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- getCancelCriterion().checkCancelInProgress(ie);
- }
- // clear interrupted flag before retry
- Thread.interrupted();
- elapsedTime = System.currentTimeMillis() - startTime;
- if (elapsedTime >= (maxWaitTime * 1000L)) {
- // return elapsed time
- return (int) (elapsedTime / 1000L);
- }
- }
- return -1;
- }
-
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD")
public void setQueryMonitorRequiredForResourceManager(boolean required) {
@@ -4764,7 +4476,7 @@ public class GemFireCacheImpl
boolean monitorRequired =
!QUERY_MONITOR_DISABLED_FOR_LOW_MEM && QUERY_MONITOR_REQUIRED_FOR_RESOURCE_MANAGER;
// Added for DUnit test purpose, which turns-on and off the this.TEST_MAX_QUERY_EXECUTION_TIME.
- if (!(this.MAX_QUERY_EXECUTION_TIME > 0 || this.TEST_MAX_QUERY_EXECUTION_TIME > 0
+ if (!(MAX_QUERY_EXECUTION_TIME > 0 || this.TEST_MAX_QUERY_EXECUTION_TIME > 0
|| monitorRequired)) {
// if this.TEST_MAX_QUERY_EXECUTION_TIME is set, send the QueryMonitor.
// Else send null, so that the QueryMonitor is turned-off.
@@ -4773,8 +4485,8 @@ public class GemFireCacheImpl
// Return the QueryMonitor service if MAX_QUERY_EXECUTION_TIME is set or it is required by the
// ResourceManager and not overriden by system property.
- if ((this.MAX_QUERY_EXECUTION_TIME > 0 || this.TEST_MAX_QUERY_EXECUTION_TIME > 0
- || monitorRequired) && this.queryMonitor == null) {
+ if ((MAX_QUERY_EXECUTION_TIME > 0 || this.TEST_MAX_QUERY_EXECUTION_TIME > 0 || monitorRequired)
+ && this.queryMonitor == null) {
synchronized (queryMonitorLock) {
if (this.queryMonitor == null) {
int maxTime = MAX_QUERY_EXECUTION_TIME > TEST_MAX_QUERY_EXECUTION_TIME
@@ -4814,7 +4526,7 @@ public class GemFireCacheImpl
SimpleWaiter() {}
- public void doWait() {
+ void doWait() {
synchronized (this) {
while (!this.notified) {
GemFireCacheImpl.this.getCancelCriterion().checkCancelInProgress(null);
@@ -4832,7 +4544,7 @@ public class GemFireCacheImpl
}
}
- public void doNotify() {
+ void doNotify() {
synchronized (this) {
this.notified = true;
this.notifyAll();
@@ -4876,7 +4588,7 @@ public class GemFireCacheImpl
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
} else {
- return new RegionFactoryImpl<K, V>(this, atts);
+ return new RegionFactoryImpl<>(this, atts);
}
}
@@ -4888,7 +4600,7 @@ public class GemFireCacheImpl
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
}
- return new RegionFactoryImpl<K, V>(this);
+ return new RegionFactoryImpl<>(this);
}
/**
@@ -4899,7 +4611,7 @@ public class GemFireCacheImpl
if (isClient()) {
throw new UnsupportedOperatio
<TRUNCATED>
[3/3] geode git commit: 2632: 1st pass cleaning up GemFireCacheImpl
Posted by kl...@apache.org.
2632: 1st pass cleaning up GemFireCacheImpl
* remove dead-code
* add @Override annotations
* remove uselss javadocs and comments
* reduce scope of constants/vars/methods where possible
* fix misc IDE warnings
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/625e71b3
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/625e71b3
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/625e71b3
Branch: refs/heads/feature/GEODE-2632-6
Commit: 625e71b3ebac66b2c1282c8a1ee7b9c958f9236a
Parents: 48d662a
Author: Kirk Lund <kl...@apache.org>
Authored: Thu Apr 20 10:43:04 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Thu Apr 20 10:59:49 2017 -0700
----------------------------------------------------------------------
.../geode/internal/cache/GemFireCacheImpl.java | 1007 ++++++------------
.../geode/internal/cache/InternalCache.java | 2 +-
.../internal/cache/xmlcache/CacheCreation.java | 2 +-
3 files changed, 352 insertions(+), 659 deletions(-)
----------------------------------------------------------------------