You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/04/25 20:42:52 UTC
[3/6] geode git commit: Safe refactorings
http://git-wip-us.apache.org/repos/asf/geode/blob/c5b8cbe8/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 56243e1..29e9f95 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -12,12 +12,69 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintStream;
+import java.io.Reader;
+import java.io.StringBufferInputStream;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.naming.Context;
+import javax.transaction.TransactionManager;
+
import com.sun.jna.Native;
import com.sun.jna.Platform;
import org.apache.commons.lang.StringUtils;
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.ForcedDisconnectException;
@@ -89,6 +146,7 @@ import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.CacheTime;
+import org.apache.geode.distributed.internal.ClusterConfigurationService;
import org.apache.geode.distributed.internal.DM;
import org.apache.geode.distributed.internal.DistributionAdvisee;
import org.apache.geode.distributed.internal.DistributionAdvisor;
@@ -103,7 +161,6 @@ import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.ResourceEvent;
import org.apache.geode.distributed.internal.ResourceEventsListener;
import org.apache.geode.distributed.internal.ServerLocation;
-import org.apache.geode.distributed.internal.ClusterConfigurationService;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.i18n.LogWriterI18n;
@@ -125,7 +182,6 @@ import org.apache.geode.internal.cache.partitioned.RedundancyAlreadyMetException
import org.apache.geode.internal.cache.persistence.BackupManager;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.cache.persistence.PersistentMemberManager;
-import org.apache.geode.internal.cache.persistence.query.TemporaryResultSetFactory;
import org.apache.geode.internal.cache.snapshot.CacheSnapshotServiceImpl;
import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
@@ -173,61 +229,6 @@ import org.apache.geode.pdx.internal.PdxInstanceFactoryImpl;
import org.apache.geode.pdx.internal.PdxInstanceImpl;
import org.apache.geode.pdx.internal.TypeRegistry;
import org.apache.geode.redis.GeodeRedisServer;
-import org.apache.logging.log4j.Logger;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.PrintStream;
-import java.io.Reader;
-import java.io.StringBufferInputStream;
-import java.io.StringWriter;
-import java.io.Writer;
-import java.net.InetSocketAddress;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.ServiceLoader;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.naming.Context;
// TODO: somebody Come up with more reasonable values for {@link #DEFAULT_LOCK_TIMEOUT}, etc.
/**
@@ -238,23 +239,22 @@ public class GemFireCacheImpl
implements InternalCache, ClientCache, HasCachePerfStats, DistributionAdvisee, CacheTime {
private static final Logger logger = LogService.getLogger();
- // moved *SERIAL_NUMBER stuff to DistributionAdvisor
-
/** The default number of seconds to wait for a distributed lock */
- public static final int DEFAULT_LOCK_TIMEOUT = Integer
- .getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.defaultLockTimeout", 60).intValue();
+ public static final int DEFAULT_LOCK_TIMEOUT =
+ Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.defaultLockTimeout", 60);
/**
* The default duration (in seconds) of a lease on a distributed lock
*/
- public static final int DEFAULT_LOCK_LEASE = Integer
- .getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.defaultLockLease", 120).intValue();
+ public static final int DEFAULT_LOCK_LEASE =
+ Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.defaultLockLease", 120);
/** The default "copy on read" attribute value */
public static final boolean DEFAULT_COPY_ON_READ = false;
/** the last instance of GemFireCache created */
private static volatile GemFireCacheImpl instance = null;
+
/**
* Just like instance but is valid for a bit longer so that pdx can still find the cache during a
* close.
@@ -262,16 +262,15 @@ public class GemFireCacheImpl
private static volatile GemFireCacheImpl pdxInstance = null;
/**
- * The default amount of time to wait for a <code>netSearch</code> to complete
+ * The default amount of time to wait for a {@code netSearch} to complete
*/
- public static final int DEFAULT_SEARCH_TIMEOUT = Integer
- .getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.defaultSearchTimeout", 300).intValue();
+ public static final int DEFAULT_SEARCH_TIMEOUT =
+ Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.defaultSearchTimeout", 300);
/**
- * The <code>CacheLifecycleListener</code> s that have been registered in this VM
+ * The {@code CacheLifecycleListener} s that have been registered in this VM
*/
- private static final Set<CacheLifecycleListener> cacheLifecycleListeners =
- new HashSet<CacheLifecycleListener>();
+ private static final Set<CacheLifecycleListener> cacheLifecycleListeners = new HashSet<>();
/**
* Define gemfire.Cache.ASYNC_EVENT_LISTENERS=true to invoke event listeners in the background
@@ -284,80 +283,63 @@ public class GemFireCacheImpl
* (the default) then the size of the entry value is unchanged by a delta application. Not a final
* so that tests can change this value.
*
- * @since GemFire hitachi 6.1.2.9
+ * @since GemFire h****** 6.1.2.9
*/
- public static boolean DELTAS_RECALCULATE_SIZE =
+ static boolean DELTAS_RECALCULATE_SIZE =
Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "DELTAS_RECALCULATE_SIZE");
- public static final int EVENT_QUEUE_LIMIT = Integer
- .getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.EVENT_QUEUE_LIMIT", 4096).intValue();
- public static final int EVENT_THREAD_LIMIT = Integer
- .getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.EVENT_THREAD_LIMIT", 16).intValue();
+ private static final int EVENT_QUEUE_LIMIT =
+ Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.EVENT_QUEUE_LIMIT", 4096);
+
+ static final int EVENT_THREAD_LIMIT =
+ Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.EVENT_THREAD_LIMIT", 16);
/**
* System property to limit the max query-execution time. By default its turned off (-1), the time
- * is set in MiliSecs.
+ * is set in milliseconds.
*/
public static final int MAX_QUERY_EXECUTION_TIME =
- Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.MAX_QUERY_EXECUTION_TIME", -1)
- .intValue();
+ Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.MAX_QUERY_EXECUTION_TIME", -1);
/**
* System property to disable query monitor even if resource manager is in use
*/
- public final boolean QUERY_MONITOR_DISABLED_FOR_LOW_MEM = Boolean
+ private final boolean queryMonitorDisabledForLowMem = Boolean
.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "Cache.DISABLE_QUERY_MONITOR_FOR_LOW_MEMORY");
/**
* Property set to true if resource manager heap percentage is set and query monitor is required
*/
- public static Boolean QUERY_MONITOR_REQUIRED_FOR_RESOURCE_MANAGER = Boolean.FALSE;
-
- /**
- * This property defines internal function that will get executed on each node to fetch active
- * REST service endpoints (servers).
- */
- public static final String FIND_REST_ENABLED_SERVERS_FUNCTION_ID =
- FindRestEnabledServersFunction.class.getName();
-
- /**
- * True if the user is allowed lock when memory resources appear to be overcommitted.
- */
- public static final boolean ALLOW_MEMORY_LOCK_WHEN_OVERCOMMITTED =
- Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "Cache.ALLOW_MEMORY_OVERCOMMIT");
+ private static boolean queryMonitorRequiredForResourceManager = false;
- // time in ms
+ /** time in milliseconds */
private static final int FIVE_HOURS = 5 * 60 * 60 * 1000;
- /** To test MAX_QUERY_EXECUTION_TIME option. */
- public int TEST_MAX_QUERY_EXECUTION_TIME = -1;
- public boolean TEST_MAX_QUERY_EXECUTION_TIME_OVERRIDE_EXCEPTION = false;
- // ///////////////////// Instance Fields ///////////////////////
+ /** To test MAX_QUERY_EXECUTION_TIME option. */
+ public int testMaxQueryExecutionTime = -1;
private final InternalDistributedSystem system;
private final DM dm;
- // This is a HashMap because I know that clear() on it does
- // not allocate objects.
- private final HashMap rootRegions;
+ private final Map<String, LocalRegion> rootRegions;
/**
* True if this cache is being created by a ClientCacheFactory.
*/
private final boolean isClient;
- private PoolFactory clientpf;
+
+ private PoolFactory poolFactory;
+
/**
* It is not final to allow cache.xml parsing to set it.
*/
private Pool defaultPool;
- private final ConcurrentMap pathToRegion = new ConcurrentHashMap();
+ private final ConcurrentMap<String, Region<?, ?>> pathToRegion = new ConcurrentHashMap<>();
- protected volatile boolean isInitialized = false;
- protected volatile boolean isClosing = false;
- protected volatile boolean closingGatewaySendersByShutdownAll = false;
- protected volatile boolean closingGatewayReceiversByShutdownAll = false;
+ private volatile boolean isInitialized = false;
+ volatile boolean isClosing = false;
/** Amount of time (in seconds) to wait for a distributed lock */
private int lockTimeout = DEFAULT_LOCK_TIMEOUT;
@@ -365,7 +347,7 @@ public class GemFireCacheImpl
/** Amount of time a lease of a distributed lock lasts */
private int lockLease = DEFAULT_LOCK_LEASE;
- /** Amount of time to wait for a <code>netSearch</code> to complete */
+ /** Amount of time to wait for a {@code netSearch} to complete */
private int searchTimeout = DEFAULT_SEARCH_TIMEOUT;
private final CachePerfStats cachePerfStats;
@@ -381,14 +363,14 @@ public class GemFireCacheImpl
* retrieval operations. It is assumed that the traversal operations on cache servers list vastly
* outnumber the mutative operations such as add, remove.
*/
- private volatile List allCacheServers = new CopyOnWriteArrayList();
+ private final List<CacheServerImpl> allCacheServers = new CopyOnWriteArrayList<>();
/**
* Controls updates to the list of all gateway senders
*
* @see #allGatewaySenders
*/
- public final Object allGatewaySendersLock = new Object();
+ private final Object allGatewaySendersLock = new Object();
/**
* the set of all gateway senders. It may be fetched safely (for enumeration), but updates must by
@@ -400,22 +382,20 @@ public class GemFireCacheImpl
* The list of all async event queues added to the cache. CopyOnWriteArrayList is used to allow
* concurrent add, remove and retrieval operations.
*/
- private volatile Set<AsyncEventQueue> allVisibleAsyncEventQueues =
- new CopyOnWriteArraySet<AsyncEventQueue>();
+ private final Set<AsyncEventQueue> allVisibleAsyncEventQueues = new CopyOnWriteArraySet<>();
/**
* The list of all async event queues added to the cache. CopyOnWriteArrayList is used to allow
* concurrent add, remove and retrieval operations.
*/
- private volatile Set<AsyncEventQueue> allAsyncEventQueues =
- new CopyOnWriteArraySet<AsyncEventQueue>();
+ private final Set<AsyncEventQueue> allAsyncEventQueues = new CopyOnWriteArraySet<>();
/**
* Controls updates to the list of all gateway receivers
*
* @see #allGatewayReceivers
*/
- public final Object allGatewayReceiversLock = new Object();
+ private final Object allGatewayReceiversLock = new Object();
/**
* the list of all gateway Receivers. It may be fetched safely (for enumeration), but updates must
@@ -423,10 +403,10 @@ public class GemFireCacheImpl
*/
private volatile Set<GatewayReceiver> allGatewayReceivers = Collections.emptySet();
- /** PartitionedRegion instances (for required-events notification */
- // This is a HashSet because I know that clear() on it does not
- // allocate any objects.
- private final HashSet<PartitionedRegion> partitionedRegions = new HashSet<PartitionedRegion>();
+ /**
+ * PartitionedRegion instances (for required-events notification
+ */
+ private final Set<PartitionedRegion> partitionedRegions = new HashSet<>();
/**
* Fix for 42051 This is a map of regions that are in the process of being destroyed. We could
@@ -436,14 +416,14 @@ public class GemFireCacheImpl
* that ID if it receives it as part of the persistent view.
*/
private final ConcurrentMap<String, DistributedRegion> regionsInDestroy =
- new ConcurrentHashMap<String, DistributedRegion>();
+ new ConcurrentHashMap<>();
- public final Object allGatewayHubsLock = new Object();
+ private final Object allGatewayHubsLock = new Object();
/**
* conflict resolver for WAN, if any
*
- * @guarded.By {@link #allGatewayHubsLock}
+ * GuardedBy {@link #allGatewayHubsLock}
*/
private GatewayConflictResolver gatewayConflictResolver;
@@ -451,7 +431,7 @@ public class GemFireCacheImpl
private boolean isServer = false;
/** transaction manager for this cache */
- private final TXManagerImpl txMgr;
+ private final TXManagerImpl transactionManager;
private RestAgent restAgent;
@@ -461,7 +441,8 @@ public class GemFireCacheImpl
private volatile boolean copyOnRead = DEFAULT_COPY_ON_READ;
/** The named region attributes registered with this cache. */
- private final Map namedRegionAttributes = Collections.synchronizedMap(new HashMap());
+ private final Map<String, RegionAttributes<?, ?>> namedRegionAttributes =
+ Collections.synchronizedMap(new HashMap<>());
/**
* if this cache was forced to close due to a forced-disconnect, we retain a
@@ -473,24 +454,24 @@ public class GemFireCacheImpl
* if this cache was forced to close due to a forced-disconnect or system failure, this keeps
* track of the reason
*/
- protected volatile Throwable disconnectCause = null;
+ volatile Throwable disconnectCause = null;
/** context where this cache was created -- for debugging, really... */
- public Exception creationStack = null;
+ private Exception creationStack = null;
/**
* a system timer task for cleaning up old bridge thread event entries
*/
- private EventTracker.ExpiryTask recordedEventSweeper;
+ private final EventTracker.ExpiryTask recordedEventSweeper;
- private TombstoneService tombstoneService;
+ private final TombstoneService tombstoneService;
/**
* DistributedLockService for PartitionedRegions. Remains null until the first PartitionedRegion
* is created. Destroyed by GemFireCache when closing the cache. Protected by synchronization on
* this GemFireCache.
*
- * @guarded.By prLockServiceLock
+ * GuardedBy prLockServiceLock
*/
private DistributedLockService prLockService;
@@ -503,7 +484,7 @@ public class GemFireCacheImpl
* DistributedLockService for GatewaySenders. Remains null until the first GatewaySender is
* created. Destroyed by GemFireCache when closing the cache.
*
- * @guarded.By gatewayLockServiceLock
+ * GuardedBy gatewayLockServiceLock
*/
private volatile DistributedLockService gatewayLockService;
@@ -514,7 +495,7 @@ public class GemFireCacheImpl
private final InternalResourceManager resourceManager;
- private final AtomicReference<BackupManager> backupManager = new AtomicReference<BackupManager>();
+ private final AtomicReference<BackupManager> backupManager = new AtomicReference<>();
private HeapEvictor heapEvictor = null;
@@ -524,7 +505,7 @@ public class GemFireCacheImpl
private final Object offHeapEvictorLock = new Object();
- private ResourceEventsListener listener;
+ private ResourceEventsListener resourceEventsListener;
/**
* Enabled when CacheExistsException issues arise in debugging
@@ -539,7 +520,7 @@ public class GemFireCacheImpl
private final PersistentMemberManager persistentMemberManager;
- private ClientMetadataService clientMetadatService = null;
+ private ClientMetadataService clientMetadataService = null;
private final Object clientMetaDatServiceLock = new Object();
@@ -557,20 +538,17 @@ public class GemFireCacheImpl
private final DiskStoreMonitor diskMonitor;
- // Stores the properties used to initialize declarables.
- private final Map<Declarable, Properties> declarablePropertiesMap =
- new ConcurrentHashMap<Declarable, Properties>();
+ /**
+ * Stores the properties used to initialize declarables.
+ */
+ private final Map<Declarable, Properties> declarablePropertiesMap = new ConcurrentHashMap<>();
/** {@link PropertyResolver} to resolve ${} type property strings */
- protected static PropertyResolver resolver;
+ private final PropertyResolver resolver;
- protected static boolean xmlParameterizationEnabled =
+ private static final boolean XML_PARAMETERIZATION_ENABLED =
!Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "xml.parameterization.disabled");
- public static Runnable internalBeforeApplyChanges;
-
- public static Runnable internalBeforeNonTXBasicPut;
-
/**
* the memcachedServer instance that is started when {@link DistributionConfig#getMemcachedPort()}
* is specified
@@ -587,23 +565,19 @@ public class GemFireCacheImpl
*
* @since GemFire 8.1
*/
- private SimpleExtensionPoint<Cache> extensionPoint = new SimpleExtensionPoint<Cache>(this, this);
+ private final SimpleExtensionPoint<Cache> extensionPoint = new SimpleExtensionPoint<>(this, this);
private final CqService cqService;
- private final Set<RegionListener> regionListeners = new ConcurrentHashSet<RegionListener>();
+ private final Set<RegionListener> regionListeners = new ConcurrentHashSet<>();
- private final Map<Class<? extends CacheService>, CacheService> services =
- new HashMap<Class<? extends CacheService>, CacheService>();
+ private final Map<Class<? extends CacheService>, CacheService> services = new HashMap<>();
public static final int DEFAULT_CLIENT_FUNCTION_TIMEOUT = 0;
private static int clientFunctionTimeout;
- private final static Boolean DISABLE_AUTO_EVICTION =
- Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "disableAutoEviction");
-
- private static SecurityService securityService = SecurityService.getSecurityService();
+ private final SecurityService securityService = SecurityService.getSecurityService();
static {
// this works around jdk bug 6427854, reported in ticket #44434
@@ -629,15 +603,13 @@ public class GemFireCacheImpl
* and stack as well as new memory mapped files or shared memory regions.
*
* @return 0 if success, non-zero if error and errno set
- *
*/
private static native int mlockall(int flags);
public static void lockMemory() {
- int result = 0;
try {
Native.register(Platform.C_LIBRARY_NAME);
- result = mlockall(1);
+ int result = mlockall(1);
if (result == 0) {
return;
}
@@ -645,14 +617,14 @@ public class GemFireCacheImpl
throw new IllegalStateException("Error trying to lock memory", t);
}
- int errno = Native.getLastError();
- String msg = "mlockall failed: " + errno;
- if (errno == 1 || errno == 12) { // EPERM || ENOMEM
- msg = "Unable to lock memory due to insufficient free space or privileges. "
+ int lastError = Native.getLastError();
+ String message = "mlockall failed: " + lastError;
+ if (lastError == 1 || lastError == 12) { // EPERM || ENOMEM
+ message = "Unable to lock memory due to insufficient free space or privileges. "
+ "Please check the RLIMIT_MEMLOCK soft resource limit (ulimit -l) and "
+ "increase the available memory if needed";
}
- throw new IllegalStateException(msg);
+ throw new IllegalStateException(message);
}
/**
@@ -661,18 +633,18 @@ public class GemFireCacheImpl
*/
@Override
public String toString() {
- final StringBuffer sb = new StringBuffer();
+ final StringBuilder sb = new StringBuilder();
sb.append("GemFireCache[");
- sb.append("id = " + System.identityHashCode(this));
- sb.append("; isClosing = " + this.isClosing);
- sb.append("; isShutDownAll = " + isCacheAtShutdownAll());
- sb.append("; created = " + this.creationDate);
- sb.append("; server = " + this.isServer);
- sb.append("; copyOnRead = " + this.copyOnRead);
- sb.append("; lockLease = " + this.lockLease);
- sb.append("; lockTimeout = " + this.lockTimeout);
+ sb.append("id = ").append(System.identityHashCode(this));
+ sb.append("; isClosing = ").append(this.isClosing);
+ sb.append("; isShutDownAll = ").append(isCacheAtShutdownAll());
+ sb.append("; created = ").append(this.creationDate);
+ sb.append("; server = ").append(this.isServer);
+ sb.append("; copyOnRead = ").append(this.copyOnRead);
+ sb.append("; lockLease = ").append(this.lockLease);
+ sb.append("; lockTimeout = ").append(this.lockTimeout);
if (this.creationStack != null) {
- sb.append("\nCreation context:\n");
+ sb.append(System.lineSeparator()).append("Creation context:").append(System.lineSeparator());
OutputStream os = new OutputStream() {
@Override
public void write(int i) {
@@ -694,7 +666,7 @@ public class GemFireCacheImpl
return instance;
}
- /*
+ /**
* Used for testing, retain the old instance in the test and re-set the value when test completes
*/
public static GemFireCacheImpl setInstanceForTests(GemFireCacheImpl cache) {
@@ -709,7 +681,7 @@ public class GemFireCacheImpl
* @return the existing cache
* @throws CacheClosedException if an existing cache can not be found.
*/
- public static final GemFireCacheImpl getExisting() {
+ public static GemFireCacheImpl getExisting() {
final GemFireCacheImpl result = instance;
if (result != null && !result.isClosing) {
return result;
@@ -748,15 +720,6 @@ public class GemFireCacheImpl
return result;
}
- // /**
- // * @deprecated remove when Lise allows a Hydra VM to
- // * be re-created
- // */
- // public static void clearInstance() {
- // System.err.println("DEBUG: do not commit GemFireCache#clearInstance");
- // instance = null;
- // }
-
public static GemFireCacheImpl createClient(DistributedSystem system, PoolFactory pf,
CacheConfig cacheConfig) {
return basicCreate(system, true, cacheConfig, pf, true, ASYNC_EVENT_LISTENERS, null);
@@ -766,7 +729,7 @@ public class GemFireCacheImpl
return basicCreate(system, true, cacheConfig, null, false, ASYNC_EVENT_LISTENERS, null);
}
- public static GemFireCacheImpl createWithAsyncEventListeners(DistributedSystem system,
+ static GemFireCacheImpl createWithAsyncEventListeners(DistributedSystem system,
CacheConfig cacheConfig, TypeRegistry typeRegistry) {
return basicCreate(system, true, cacheConfig, null, false, true, typeRegistry);
}
@@ -776,8 +739,6 @@ public class GemFireCacheImpl
return basicCreate(system, existingOk, cacheConfig, null, false, ASYNC_EVENT_LISTENERS, null);
}
-
-
private static GemFireCacheImpl basicCreate(DistributedSystem system, boolean existingOk,
CacheConfig cacheConfig, PoolFactory pf, boolean isClient, boolean asyncEventListeners,
TypeRegistry typeRegistry) throws CacheExistsException, TimeoutException,
@@ -793,7 +754,7 @@ public class GemFireCacheImpl
return instance;
}
} catch (CacheXmlException | IllegalArgumentException e) {
- logger.error(e.getLocalizedMessage());
+ logger.error(e.getLocalizedMessage()); // TODO: log the full stack trace or not?
throw e;
} catch (Error | RuntimeException e) {
logger.error(e);
@@ -821,15 +782,15 @@ public class GemFireCacheImpl
}
/**
- * Creates a new instance of GemFireCache and populates it according to the
- * <code>cache.xml</code>, if appropriate.
+ * Creates a new instance of GemFireCache and populates it according to the {@code cache.xml}, if
+ * appropriate.
*
* @param typeRegistry: currently only unit tests set this parameter to a non-null value
*/
private GemFireCacheImpl(boolean isClient, PoolFactory pf, DistributedSystem system,
CacheConfig cacheConfig, boolean asyncEventListeners, TypeRegistry typeRegistry) {
this.isClient = isClient;
- this.clientpf = pf;
+ this.poolFactory = pf;
this.cacheConfig = cacheConfig; // do early for bug 43213
this.pdxRegistry = typeRegistry;
@@ -846,28 +807,25 @@ public class GemFireCacheImpl
// We only support management on members of a distributed system
// Should do this: if (!getSystem().isLoner()) {
// but it causes quickstart.CqClientTest to hang
- this.listener = new ManagementListener();
- this.system.addResourceListener(listener);
+ this.resourceEventsListener = new ManagementListener();
+ this.system.addResourceListener(this.resourceEventsListener);
if (this.system.isLoner()) {
this.system.getInternalLogWriter()
.info(LocalizedStrings.GemFireCacheImpl_RUNNING_IN_LOCAL_MODE);
}
} else {
- getLogger().info("Running in client mode");
- this.listener = null;
+ logger.info("Running in client mode");
+ this.resourceEventsListener = null;
}
// Don't let admin-only VMs create Cache's just yet.
- DM dm = this.system.getDistributionManager();
- if (dm instanceof DistributionManager) {
- if (((DistributionManager) dm).getDMType() == DistributionManager.ADMIN_ONLY_DM_TYPE) {
- throw new IllegalStateException(
- LocalizedStrings.GemFireCache_CANNOT_CREATE_A_CACHE_IN_AN_ADMINONLY_VM
- .toLocalizedString());
- }
+ if (this.dm.getDMType() == DistributionManager.ADMIN_ONLY_DM_TYPE) {
+ throw new IllegalStateException(
+ LocalizedStrings.GemFireCache_CANNOT_CREATE_A_CACHE_IN_AN_ADMINONLY_VM
+ .toLocalizedString());
}
- this.rootRegions = new HashMap();
+ this.rootRegions = new HashMap<>();
this.cqService = CqServiceProvider.create(this);
@@ -875,44 +833,39 @@ public class GemFireCacheImpl
this.cachePerfStats = new CachePerfStats(system);
CachePerfStats.enableClockStats = this.system.getConfig().getEnableTimeStatistics();
- this.txMgr = new TXManagerImpl(this.cachePerfStats, this);
- dm.addMembershipListener(this.txMgr);
+ this.transactionManager = new TXManagerImpl(this.cachePerfStats, this);
+ this.dm.addMembershipListener(this.transactionManager);
this.creationDate = new Date();
this.persistentMemberManager = new PersistentMemberManager();
if (asyncEventListeners) {
- final ThreadGroup group =
+ final ThreadGroup threadGroup =
LoggingThreadGroup.createThreadGroup("Message Event Threads", logger);
- ThreadFactory tf = new ThreadFactory() {
- @Override
- public Thread newThread(final Runnable command) {
- final Runnable r = new Runnable() {
- @Override
- public void run() {
- ConnectionTable.threadWantsSharedResources();
- command.run();
- }
- };
- Thread thread = new Thread(group, r, "Message Event Thread");
- thread.setDaemon(true);
- return thread;
- }
+ ThreadFactory threadFactory = (Runnable command) -> {
+ final Runnable runnable = () -> {
+ ConnectionTable.threadWantsSharedResources();
+ command.run();
+ };
+ Thread thread = new Thread(threadGroup, runnable, "Message Event Thread");
+ thread.setDaemon(true);
+ return thread;
};
- ArrayBlockingQueue q = new ArrayBlockingQueue(EVENT_QUEUE_LIMIT);
- this.eventThreadPool = new PooledExecutorWithDMStats(q, EVENT_THREAD_LIMIT,
- this.cachePerfStats.getEventPoolHelper(), tf, 1000);
+ ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(EVENT_QUEUE_LIMIT);
+ this.eventThreadPool = new PooledExecutorWithDMStats(queue, EVENT_THREAD_LIMIT,
+ this.cachePerfStats.getEventPoolHelper(), threadFactory, 1000);
} else {
this.eventThreadPool = null;
}
// Initialize the advisor here, but wait to exchange profiles until cache is fully built
this.resourceAdvisor = ResourceAdvisor.createResourceAdvisor(this);
+
// Initialize the advisor here, but wait to exchange profiles until cache is fully built
this.jmxAdvisor = JmxManagerAdvisor.createJmxManagerAdvisor(new JmxManagerAdvisee(this));
- resourceManager = InternalResourceManager.createResourceManager(this);
+ this.resourceManager = InternalResourceManager.createResourceManager(this);
this.serialNumber = DistributionAdvisor.createSerialNumber();
getInternalResourceManager().addResourceListener(ResourceType.HEAP_MEMORY, getHeapEvictor());
@@ -925,15 +878,15 @@ public class GemFireCacheImpl
getOffHeapEvictor());
}
- recordedEventSweeper = EventTracker.startTrackerServices(this);
- tombstoneService = TombstoneService.initialize(this);
+ this.recordedEventSweeper = EventTracker.startTrackerServices(this);
+ this.tombstoneService = TombstoneService.initialize(this);
TypeRegistry.init();
basicSetPdxSerializer(this.cacheConfig.getPdxSerializer());
TypeRegistry.open();
if (!isClient()) {
- // Initialize the QRM thread freqeuncy to default (1 second )to prevent spill
+ // Initialize the QRM thread frequency to default (1 second )to prevent spill
// over from previous Cache , as the interval is stored in a static
// volatile field.
HARegionQueue.setMessageSyncInterval(HARegionQueue.DEFAULT_MESSAGE_SYNC_INTERVAL);
@@ -948,26 +901,28 @@ public class GemFireCacheImpl
}
this.txEntryStateFactory = TXEntryState.getFactory();
- if (xmlParameterizationEnabled) {
- /** If product properties file is available replace properties from there */
+ if (XML_PARAMETERIZATION_ENABLED) {
+ // If product properties file is available replace properties from there
Properties userProps = this.system.getConfig().getUserDefinedProps();
if (userProps != null && !userProps.isEmpty()) {
- resolver = new CacheXmlPropertyResolver(false,
+ this.resolver = new CacheXmlPropertyResolver(false,
PropertyResolver.NO_SYSTEM_PROPERTIES_OVERRIDE, userProps);
} else {
- resolver = new CacheXmlPropertyResolver(false,
+ this.resolver = new CacheXmlPropertyResolver(false,
PropertyResolver.NO_SYSTEM_PROPERTIES_OVERRIDE, null);
}
+ } else {
+ this.resolver = null;
}
SystemFailure.signalCacheCreate();
- diskMonitor = new DiskStoreMonitor();
+ this.diskMonitor = new DiskStoreMonitor();
} // synchronized
}
public boolean isRESTServiceRunning() {
- return isRESTServiceRunning;
+ return this.isRESTServiceRunning;
}
public void setRESTServiceRunning(boolean isRESTServiceRunning) {
@@ -980,23 +935,25 @@ public class GemFireCacheImpl
* @return RestAgent
*/
public RestAgent getRestAgent() {
- return restAgent;
+ return this.restAgent;
}
- /*****
+ /**
* Request the shared configuration from the locator(s) which have the Cluster config service
* running
*/
- public ConfigurationResponse requestSharedConfiguration() {
+ private ConfigurationResponse requestSharedConfiguration() {
final DistributionConfig config = this.system.getConfig();
- if (!(dm instanceof DistributionManager))
+ if (!(this.dm instanceof DistributionManager)) {
return null;
+ }
// do nothing if this vm is/has locator or this is a client
- if (((DistributionManager) dm).getDMType() == DistributionManager.LOCATOR_DM_TYPE || isClient
- || Locator.getLocator() != null)
+ if (this.dm.getDMType() == DistributionManager.LOCATOR_DM_TYPE || this.isClient
+ || Locator.getLocator() != null) {
return null;
+ }
// can't simply return null if server is not using shared configuration, since we need to find
// out
@@ -1015,13 +972,11 @@ public class GemFireCacheImpl
return null;
}
-
- ConfigurationResponse response = null;
List<String> locatorConnectionStrings = getSharedConfigLocatorConnectionStringList();
try {
- response = ClusterConfigurationLoader.requestConfigurationFromLocators(system.getConfig(),
- locatorConnectionStrings);
+ ConfigurationResponse response = ClusterConfigurationLoader
+ .requestConfigurationFromLocators(this.system.getConfig(), locatorConnectionStrings);
// log the configuration received from the locator
logger.info(LocalizedMessage
@@ -1031,7 +986,7 @@ public class GemFireCacheImpl
Configuration clusterConfig =
response.getRequestedConfiguration().get(ClusterConfigurationService.CLUSTER_CONFIG);
Properties clusterSecProperties =
- (clusterConfig == null) ? new Properties() : clusterConfig.getGemfireProperties();
+ clusterConfig == null ? new Properties() : clusterConfig.getGemfireProperties();
// If not using shared configuration, return null or throw an exception is locator is secured
if (!config.getUseSharedConfiguration()) {
@@ -1064,15 +1019,10 @@ public class GemFireCacheImpl
}
}
- public void deployJarsRecevedFromClusterConfiguration(ConfigurationResponse response) {
+ private void deployJarsReceivedFromClusterConfiguration(ConfigurationResponse response) {
try {
ClusterConfigurationLoader.deployJarsReceivedFromClusterConfiguration(this, response);
- } catch (IOException e) {
- throw new GemFireConfigException(
- LocalizedStrings.GemFireCache_EXCEPTION_OCCURED_WHILE_DEPLOYING_JARS_FROM_SHARED_CONDFIGURATION
- .toLocalizedString(),
- e);
- } catch (ClassNotFoundException e) {
+ } catch (IOException | ClassNotFoundException e) {
throw new GemFireConfigException(
LocalizedStrings.GemFireCache_EXCEPTION_OCCURED_WHILE_DEPLOYING_JARS_FROM_SHARED_CONDFIGURATION
.toLocalizedString(),
@@ -1080,10 +1030,10 @@ public class GemFireCacheImpl
}
}
-
- // When called, clusterProps and serverProps and key could not be null
- public static boolean isMisConfigured(Properties clusterProps, Properties serverProps,
- String key) {
+ /**
+ * When called, clusterProps and serverProps and key could not be null
+ */
+ static boolean isMisConfigured(Properties clusterProps, Properties serverProps, String key) {
String clusterPropValue = clusterProps.getProperty(key);
String serverPropValue = serverProps.getProperty(key);
@@ -1095,36 +1045,31 @@ public class GemFireCacheImpl
if (StringUtils.isBlank(clusterPropValue))
return true;
- // at this point check for eqality
+ // at this point check for equality
return !clusterPropValue.equals(serverPropValue);
}
- public List<String> getSharedConfigLocatorConnectionStringList() {
- List<String> locatorConnectionStringList = new ArrayList<String>();
+ private List<String> getSharedConfigLocatorConnectionStringList() {
+ List<String> locatorConnectionStringList = new ArrayList<>();
- Map<InternalDistributedMember, Collection<String>> scl =
+ Map<InternalDistributedMember, Collection<String>> locatorsWithClusterConfig =
this.getDistributionManager().getAllHostedLocatorsWithSharedConfiguration();
// If there are no locators with Shared configuration, that means the system has been started
// without shared configuration
// then do not make requests to the locators
- if (!scl.isEmpty()) {
- Set<Entry<InternalDistributedMember, Collection<String>>> locs = scl.entrySet();
+ if (!locatorsWithClusterConfig.isEmpty()) {
+ Set<Entry<InternalDistributedMember, Collection<String>>> locators =
+ locatorsWithClusterConfig.entrySet();
- for (Entry<InternalDistributedMember, Collection<String>> loc : locs) {
+ for (Entry<InternalDistributedMember, Collection<String>> loc : locators) {
Collection<String> locStrings = loc.getValue();
- Iterator<String> locStringIter = locStrings.iterator();
-
- while (locStringIter.hasNext()) {
- locatorConnectionStringList.add(locStringIter.next());
- }
+ locatorConnectionStringList.addAll(locStrings);
}
}
return locatorConnectionStringList;
}
-
-
/**
* Used by unit tests to force cache creation to use a test generated cache.xml
*/
@@ -1149,7 +1094,7 @@ public class GemFireCacheImpl
return this.isClient || !getAllPools().isEmpty();
}
- private Collection<Pool> getAllPools() {
+ private static Collection<Pool> getAllPools() {
Collection<Pool> pools = PoolManagerImpl.getPMI().getMap().values();
for (Iterator<Pool> itr = pools.iterator(); itr.hasNext();) {
PoolImpl pool = (PoolImpl) itr.next();
@@ -1168,8 +1113,8 @@ public class GemFireCacheImpl
return this.defaultPool;
}
- private void setDefaultPool(Pool v) {
- this.defaultPool = v;
+ private void setDefaultPool(Pool value) {
+ this.defaultPool = value;
}
/**
@@ -1184,9 +1129,7 @@ public class GemFireCacheImpl
GemFireCacheImpl.instance = this;
GemFireCacheImpl.pdxInstance = this;
- for (Iterator<CacheLifecycleListener> iter = cacheLifecycleListeners.iterator(); iter
- .hasNext();) {
- CacheLifecycleListener listener = (CacheLifecycleListener) iter.next();
+ for (CacheLifecycleListener listener : cacheLifecycleListeners) {
listener.cacheCreated(this);
}
@@ -1194,24 +1137,23 @@ public class GemFireCacheImpl
// request and check cluster configuration
ConfigurationResponse configurationResponse = requestSharedConfiguration();
- deployJarsRecevedFromClusterConfiguration(configurationResponse);
+ deployJarsReceivedFromClusterConfiguration(configurationResponse);
// apply the cluster's properties configuration and initialize security using that configuration
ClusterConfigurationLoader.applyClusterPropertiesConfiguration(this, configurationResponse,
- system.getConfig());
+ this.system.getConfig());
// first initialize the security service using the security properties
- securityService.initSecurity(system.getConfig().getSecurityProps());
+ this.securityService.initSecurity(this.system.getConfig().getSecurityProps());
// secondly if cacheConfig has a securityManager, use that instead
- if (cacheConfig.getSecurityManager() != null) {
- securityService.setSecurityManager(cacheConfig.getSecurityManager());
+ if (this.cacheConfig.getSecurityManager() != null) {
+ this.securityService.setSecurityManager(this.cacheConfig.getSecurityManager());
}
// if cacheConfig has a postProcessor, use that instead
- if (cacheConfig.getPostProcessor() != null) {
- securityService.setPostProcessor(cacheConfig.getPostProcessor());
+ if (this.cacheConfig.getPostProcessor() != null) {
+ this.securityService.setPostProcessor(this.cacheConfig.getPostProcessor());
}
-
SystemMemberCacheEventProcessor.send(this, Operation.CACHE_CREATE);
this.resourceAdvisor.initializationGate();
@@ -1225,23 +1167,21 @@ public class GemFireCacheImpl
// we will not be ready for all the events that cache.xml
// processing can deliver (region creation, etc.).
// This call may need to be moved inside initializeDeclarativeCache.
- /** Entry to GemFire Management service **/
- this.jmxAdvisor.initializationGate();
+ this.jmxAdvisor.initializationGate(); // Entry to GemFire Management service
// this starts up the ManagementService, register and federate the internal beans
- system.handleResourceEvent(ResourceEvent.CACHE_CREATE, this);
-
- boolean completedCacheXml = false;
+ this.system.handleResourceEvent(ResourceEvent.CACHE_CREATE, this);
initializeServices();
+ boolean completedCacheXml = false;
try {
if (configurationResponse == null) {
// Deploy all the jars from the deploy working dir.
ClassPathLoader.getLatest().getJarDeployer().loadPreviouslyDeployedJarsFromDisk();
}
ClusterConfigurationLoader.applyClusterXmlConfiguration(this, configurationResponse,
- system.getConfig());
+ this.system.getConfig());
initializeDeclarativeCache();
completedCacheXml = true;
} finally {
@@ -1256,7 +1196,7 @@ public class GemFireCacheImpl
}
}
- this.clientpf = null;
+ this.poolFactory = null;
startColocatedJmxManagerLocator();
@@ -1270,7 +1210,7 @@ public class GemFireCacheImpl
DEFAULT_CLIENT_FUNCTION_TIMEOUT);
clientFunctionTimeout = time >= 0 ? time : DEFAULT_CLIENT_FUNCTION_TIMEOUT;
- isInitialized = true;
+ this.isInitialized = true;
}
/**
@@ -1282,35 +1222,35 @@ public class GemFireCacheImpl
for (CacheService service : loader) {
service.init(this);
this.services.put(service.getInterface(), service);
- system.handleResourceEvent(ResourceEvent.CACHE_SERVICE_CREATE, service);
+ this.system.handleResourceEvent(ResourceEvent.CACHE_SERVICE_CREATE, service);
}
}
private boolean isNotJmxManager() {
- return (this.system.getConfig().getJmxManagerStart() != true);
+ return !this.system.getConfig().getJmxManagerStart();
}
private boolean isServerNode() {
- return (this.system.getDistributedMember().getVmKind() != DistributionManager.LOCATOR_DM_TYPE
+ return this.system.getDistributedMember().getVmKind() != DistributionManager.LOCATOR_DM_TYPE
&& this.system.getDistributedMember().getVmKind() != DistributionManager.ADMIN_ONLY_DM_TYPE
- && !isClient());
+ && !isClient();
}
private void startRestAgentServer(GemFireCacheImpl cache) {
if (this.system.getConfig().getStartDevRestApi() && isNotJmxManager() && isServerNode()) {
this.restAgent = new RestAgent(this.system.getConfig());
- restAgent.start(cache);
+ this.restAgent.start(cache);
} else {
this.restAgent = null;
}
}
private void startMemcachedServer() {
- int port = system.getConfig().getMemcachedPort();
+ int port = this.system.getConfig().getMemcachedPort();
if (port != 0) {
- String protocol = system.getConfig().getMemcachedProtocol();
+ String protocol = this.system.getConfig().getMemcachedProtocol();
assert protocol != null;
- String bindAddress = system.getConfig().getMemcachedBindAddress();
+ String bindAddress = this.system.getConfig().getMemcachedBindAddress();
assert bindAddress != null;
if (bindAddress.equals(DistributionConfig.DEFAULT_MEMCACHED_BIND_ADDRESS)) {
logger.info(LocalizedMessage.create(
@@ -1328,9 +1268,9 @@ public class GemFireCacheImpl
}
private void startRedisServer() {
- int port = system.getConfig().getRedisPort();
+ int port = this.system.getConfig().getRedisPort();
if (port != 0) {
- String bindAddress = system.getConfig().getRedisBindAddress();
+ String bindAddress = this.system.getConfig().getRedisBindAddress();
assert bindAddress != null;
if (bindAddress.equals(DistributionConfig.DEFAULT_REDIS_BIND_ADDRESS)) {
getLoggerI18n().info(
@@ -1346,7 +1286,6 @@ public class GemFireCacheImpl
}
}
-
@Override
public URL getCacheXmlURL() {
if (this.getMyId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
@@ -1356,11 +1295,11 @@ public class GemFireCacheImpl
if (xmlFile == null) {
xmlFile = this.system.getConfig().getCacheXmlFile();
}
- if ("".equals(xmlFile.getName())) {
+ if (xmlFile.getName().isEmpty()) {
return null;
}
- URL url = null;
+ URL url;
if (!xmlFile.exists() || !xmlFile.isFile()) {
// do a resource search
String resource = xmlFile.getPath();
@@ -1372,7 +1311,7 @@ public class GemFireCacheImpl
} else {
try {
url = xmlFile.toURL();
- } catch (IOException ex) {
+ } catch (MalformedURLException ex) {
throw new CacheXmlException(
LocalizedStrings.GemFireCache_COULD_NOT_CONVERT_XML_FILE_0_TO_AN_URL
.toLocalizedString(xmlFile),
@@ -1386,7 +1325,7 @@ public class GemFireCacheImpl
throw new CacheXmlException(
LocalizedStrings.GemFireCache_DECLARATIVE_CACHE_XML_FILERESOURCE_0_DOES_NOT_EXIST
.toLocalizedString(xmlFile));
- } else /* if (!xmlFile.isFile()) */ {
+ } else {
throw new CacheXmlException(
LocalizedStrings.GemFireCache_DECLARATIVE_XML_FILE_0_IS_NOT_A_FILE
.toLocalizedString(xmlFile));
@@ -1398,21 +1337,20 @@ public class GemFireCacheImpl
}
/**
- * Initializes the contents of this <code>Cache</code> according to the declarative caching XML
- * file specified by the given <code>DistributedSystem</code>. Note that this operation cannot be
- * performed in the constructor because creating regions in the cache, etc. uses the cache itself
- * (which isn't initialized until the constructor returns).
+ * Initializes the contents of this {@code Cache} according to the declarative caching XML file
+ * specified by the given {@code DistributedSystem}. Note that this operation cannot be performed
+ * in the constructor because creating regions in the cache, etc. uses the cache itself (which
+ * isn't initialized until the constructor returns).
*
* @throws CacheXmlException If something goes wrong while parsing the declarative caching XML
* file.
* @throws TimeoutException If a {@link org.apache.geode.cache.Region#put(Object, Object)}times
* out while initializing the cache.
- * @throws CacheWriterException If a <code>CacheWriterException</code> is thrown while
- * initializing the cache.
- * @throws RegionExistsException If the declarative caching XML file desribes a region that
- * already exists (including the root region).
- * @throws GatewayException If a <code>GatewayException</code> is thrown while initializing the
+ * @throws CacheWriterException If a {@code CacheWriterException} is thrown while initializing the
* cache.
+ * @throws RegionExistsException If the declarative caching XML file describes a region that
+ * already exists (including the root region).
+ * @throws GatewayException If a {@code GatewayException} is thrown while initializing the cache.
*
* @see #loadCacheXml
*/
@@ -1432,9 +1370,9 @@ public class GemFireCacheImpl
return; // nothing needs to be done
}
+ InputStream stream = null;
try {
logCacheXML(url, cacheXmlDescription);
- InputStream stream = null;
if (cacheXmlDescription != null) {
if (logger.isTraceEnabled()) {
logger.trace("initializing cache with generated XML: {}", cacheXmlDescription);
@@ -1444,40 +1382,57 @@ public class GemFireCacheImpl
stream = url.openStream();
}
loadCacheXml(stream);
- try {
- stream.close();
- } catch (IOException ignore) {
- }
+
} catch (IOException ex) {
throw new CacheXmlException(
LocalizedStrings.GemFireCache_WHILE_OPENING_CACHE_XML_0_THE_FOLLOWING_ERROR_OCCURRED_1
- .toLocalizedString(new Object[] {url.toString(), ex}));
+ .toLocalizedString(url.toString(), ex));
} catch (CacheXmlException ex) {
CacheXmlException newEx =
new CacheXmlException(LocalizedStrings.GemFireCache_WHILE_READING_CACHE_XML_0_1
- .toLocalizedString(new Object[] {url, ex.getMessage()}));
+ .toLocalizedString(url, ex.getMessage()));
+ /*
+ * TODO: why use setStackTrace and initCause? removal breaks several tests: OplogRVVJUnitTest,
+ * NewDeclarativeIndexCreationJUnitTest CacheXml70DUnitTest, CacheXml80DUnitTest,
+ * CacheXml81DUnitTest, CacheXmlGeode10DUnitTest RegionManagementDUnitTest
+ */
newEx.setStackTrace(ex.getStackTrace());
newEx.initCause(ex.getCause());
throw newEx;
+
+ } finally {
+ if (stream != null) {
+ try {
+ stream.close();
+ } catch (IOException ignore) {
+ }
+ }
}
}
- private void logCacheXML(URL url, String cacheXmlDescription) {
+ private static void logCacheXML(URL url, String cacheXmlDescription) {
if (cacheXmlDescription == null) {
StringBuilder sb = new StringBuilder();
+ BufferedReader br = null;
try {
- final String EOLN = System.getProperty("line.separator");
- BufferedReader br = new BufferedReader(new InputStreamReader(url.openStream()));
- String l = br.readLine();
- while (l != null) {
- if (!l.isEmpty()) {
- sb.append(EOLN).append(l);
+ final String lineSeparator = System.getProperty("line.separator");
+ br = new BufferedReader(new InputStreamReader(url.openStream()));
+ String line = br.readLine();
+ while (line != null) {
+ if (!line.isEmpty()) {
+ sb.append(lineSeparator).append(line);
}
- l = br.readLine();
+ line = br.readLine();
}
- br.close();
} catch (IOException ignore) {
+ } finally {
+ if (br != null) {
+ try {
+ br.close();
+ } catch (IOException ignore) {
+ }
+ }
}
logger.info(
LocalizedMessage.create(LocalizedStrings.GemFireCache_INITIALIZING_CACHE_USING__0__1,
@@ -1516,7 +1471,7 @@ public class GemFireCacheImpl
}
/**
- * create diskstore factory with default attributes
+ * create diskStore factory with default attributes
*
* @since GemFire prPersistSprint2
*/
@@ -1526,7 +1481,7 @@ public class GemFireCacheImpl
}
/**
- * create diskstore factory with predefined attributes
+ * create diskStore factory with predefined attributes
*
* @since GemFire prPersistSprint2
*/
@@ -1534,22 +1489,16 @@ public class GemFireCacheImpl
return new DiskStoreFactoryImpl(this, attrs);
}
- protected class Stopper extends CancelCriterion {
+ class Stopper extends CancelCriterion {
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.CancelCriterion#cancelInProgress()
- */
@Override
public String cancelInProgress() {
- String reason =
- GemFireCacheImpl.this.getDistributedSystem().getCancelCriterion().cancelInProgress();
+ String reason = getDistributedSystem().getCancelCriterion().cancelInProgress();
if (reason != null) {
return reason;
}
if (GemFireCacheImpl.this.disconnectCause != null) {
- return disconnectCause.getMessage();
+ return GemFireCacheImpl.this.disconnectCause.getMessage();
}
if (GemFireCacheImpl.this.isClosing) {
return "The cache is closed."; // this + ": closed";
@@ -1557,42 +1506,37 @@ public class GemFireCacheImpl
return null;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.CancelCriterion#generateCancelledException(java.lang.Throwable)
- */
@Override
- public RuntimeException generateCancelledException(Throwable e) {
+ public RuntimeException generateCancelledException(Throwable throwable) {
String reason = cancelInProgress();
if (reason == null) {
return null;
}
RuntimeException result =
- getDistributedSystem().getCancelCriterion().generateCancelledException(e);
+ getDistributedSystem().getCancelCriterion().generateCancelledException(throwable);
if (result != null) {
return result;
}
if (GemFireCacheImpl.this.disconnectCause == null) {
// No root cause, specify the one given and be done with it.
- return new CacheClosedException(reason, e);
+ return new CacheClosedException(reason, throwable);
}
- if (e == null) {
+ if (throwable == null) {
// Caller did not specify any root cause, so just use our own.
return new CacheClosedException(reason, GemFireCacheImpl.this.disconnectCause);
}
// Attempt to stick rootCause at tail end of the exception chain.
- Throwable nt = e;
+ Throwable nt = throwable;
while (nt.getCause() != null) {
nt = nt.getCause();
}
try {
nt.initCause(GemFireCacheImpl.this.disconnectCause);
- return new CacheClosedException(reason, e);
+ return new CacheClosedException(reason, throwable);
} catch (IllegalStateException e2) {
- // Bug 39496 (Jrockit related) Give up. The following
+ // Bug 39496 (JRockit related) Give up. The following
// error is not entirely sane but gives the correct general picture.
return new CacheClosedException(reason, GemFireCacheImpl.this.disconnectCause);
}
@@ -1603,7 +1547,7 @@ public class GemFireCacheImpl
@Override
public CancelCriterion getCancelCriterion() {
- return stopper;
+ return this.stopper;
}
/** return true if the cache was closed due to being shunned by other members */
@@ -1676,8 +1620,8 @@ public class GemFireCacheImpl
public static void emergencyClose() {
final boolean DEBUG = SystemFailure.TRACE_CLOSE;
- GemFireCacheImpl inst = GemFireCacheImpl.instance;
- if (inst == null) {
+ GemFireCacheImpl cache = GemFireCacheImpl.instance;
+ if (cache == null) {
if (DEBUG) {
System.err.println("GemFireCache#emergencyClose: no instance");
}
@@ -1687,10 +1631,9 @@ public class GemFireCacheImpl
GemFireCacheImpl.instance = null;
GemFireCacheImpl.pdxInstance = null;
// leave the PdxSerializer set if we have one to prevent 43412
- // TypeRegistry.setPdxSerializer(null);
// Shut down messaging first
- InternalDistributedSystem ids = inst.system;
+ InternalDistributedSystem ids = cache.system;
if (ids != null) {
if (DEBUG) {
System.err.println("DEBUG: emergencyClose InternalDistributedSystem");
@@ -1698,20 +1641,18 @@ public class GemFireCacheImpl
ids.emergencyClose();
}
- inst.disconnectCause = SystemFailure.getFailure();
- inst.isClosing = true;
+ cache.disconnectCause = SystemFailure.getFailure();
+ cache.isClosing = true;
// Clear cache servers
if (DEBUG) {
System.err.println("DEBUG: Close cache servers");
}
{
- Iterator allCacheServersItr = inst.allCacheServers.iterator();
- while (allCacheServersItr.hasNext()) {
- CacheServerImpl bs = (CacheServerImpl) allCacheServersItr.next();
- AcceptorImpl ai = bs.getAcceptor();
- if (ai != null) {
- ai.emergencyClose();
+ for (CacheServerImpl cacheServer : cache.allCacheServers) {
+ AcceptorImpl acceptor = cacheServer.getAcceptor();
+ if (acceptor != null) {
+ acceptor.emergencyClose();
}
}
}
@@ -1725,16 +1666,13 @@ public class GemFireCacheImpl
System.err.println("DEBUG: closing gateway hubs");
}
- // These are synchronized sets -- avoid potential deadlocks
- // instance.pathToRegion.clear(); // garbage collection
- // instance.gatewayHubs.clear();
-
// rootRegions is intentionally *not* synchronized. The
// implementation of clear() does not currently allocate objects.
- inst.rootRegions.clear();
+ cache.rootRegions.clear();
+
// partitionedRegions is intentionally *not* synchronized, The
// implementation of clear() does not currently allocate objects.
- inst.partitionedRegions.clear();
+ cache.partitionedRegions.clear();
if (DEBUG) {
System.err.println("DEBUG: done with cache emergency close");
}
@@ -1742,7 +1680,7 @@ public class GemFireCacheImpl
@Override
public boolean isCacheAtShutdownAll() {
- return isShutDownAll.get();
+ return this.isShutDownAll.get();
}
/**
@@ -1752,7 +1690,7 @@ public class GemFireCacheImpl
private static final int shutdownAllPoolSize =
Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "SHUTDOWN_ALL_POOL_SIZE", -1);
- void shutdownSubTreeGracefully(Map<String, PartitionedRegion> prSubMap) {
+ private void shutdownSubTreeGracefully(Map<String, PartitionedRegion> prSubMap) {
for (final PartitionedRegion pr : prSubMap.values()) {
shutDownOnePRGracefully(pr);
}
@@ -1782,27 +1720,23 @@ public class GemFireCacheImpl
boolean testIGE = Boolean.getBoolean("TestInternalGemFireError");
if (testIGE) {
- InternalGemFireError assErr = new InternalGemFireError(
+ throw new InternalGemFireError(
LocalizedStrings.GemFireCache_UNEXPECTED_EXCEPTION.toLocalizedString());
- throw assErr;
}
- // bug 44031 requires multithread shutdownall should be grouped
+ // bug 44031 requires multithread shutDownAll should be grouped
// by root region. However, shutDownAllDuringRecovery.conf test revealed that
// we have to close colocated child regions first.
// Now check all the PR, if anyone has colocate-with attribute, sort all the
// PRs by colocation relationship and close them sequentially, otherwise still
// group them by root region.
- TreeMap<String, Map<String, PartitionedRegion>> prTrees = getPRTrees();
+ SortedMap<String, Map<String, PartitionedRegion>> prTrees = getPRTrees();
if (prTrees.size() > 1 && shutdownAllPoolSize != 1) {
ExecutorService es = getShutdownAllExecutorService(prTrees.size());
for (final Map<String, PartitionedRegion> prSubMap : prTrees.values()) {
- es.execute(new Runnable() {
- @Override
- public void run() {
- ConnectionTable.threadWantsSharedResources();
- shutdownSubTreeGracefully(prSubMap);
- }
+ es.execute(() -> {
+ ConnectionTable.threadWantsSharedResources();
+ shutdownSubTreeGracefully(prSubMap);
});
} // for each root
es.shutdown();
@@ -1827,76 +1761,83 @@ public class GemFireCacheImpl
}
private ExecutorService getShutdownAllExecutorService(int size) {
- final ThreadGroup thrGrp = LoggingThreadGroup.createThreadGroup("ShutdownAllGroup", logger);
- ThreadFactory thrFactory = new ThreadFactory() {
+ final ThreadGroup threadGroup =
+ LoggingThreadGroup.createThreadGroup("ShutdownAllGroup", logger);
+ ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger threadCount = new AtomicInteger(1);
@Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(thrGrp, r, "ShutdownAll-" + threadCount.getAndIncrement());
- t.setDaemon(true);
- return t;
+ public Thread newThread(Runnable runnable) {
+ Thread thread =
+ new Thread(threadGroup, runnable, "ShutdownAll-" + this.threadCount.getAndIncrement());
+ thread.setDaemon(true);
+ return thread;
}
};
- ExecutorService es = Executors
- .newFixedThreadPool(shutdownAllPoolSize == -1 ? size : shutdownAllPoolSize, thrFactory);
- return es;
+ return Executors.newFixedThreadPool(shutdownAllPoolSize == -1 ? size : shutdownAllPoolSize,
+ threadFactory);
}
- private void shutDownOnePRGracefully(PartitionedRegion pr) {
+ private void shutDownOnePRGracefully(PartitionedRegion partitionedRegion) {
boolean acquiredLock = false;
try {
- pr.acquireDestroyLock();
+ partitionedRegion.acquireDestroyLock();
acquiredLock = true;
- synchronized (pr.getRedundancyProvider()) {
- if (pr.isDataStore() && pr.getDataStore() != null
- && pr.getDataPolicy() == DataPolicy.PERSISTENT_PARTITION) {
- int numBuckets = pr.getTotalNumberOfBuckets();
- Map<InternalDistributedMember, PersistentMemberID> bucketMaps[] = new Map[numBuckets];
- PartitionedRegionDataStore prds = pr.getDataStore();
+ synchronized (partitionedRegion.getRedundancyProvider()) {
+ if (partitionedRegion.isDataStore() && partitionedRegion.getDataStore() != null
+ && partitionedRegion.getDataPolicy() == DataPolicy.PERSISTENT_PARTITION) {
+ int numBuckets = partitionedRegion.getTotalNumberOfBuckets();
+ Map<InternalDistributedMember, PersistentMemberID>[] bucketMaps = new Map[numBuckets];
+ PartitionedRegionDataStore dataStore = partitionedRegion.getDataStore();
// lock all the primary buckets
- Set<Entry<Integer, BucketRegion>> bucketEntries = prds.getAllLocalBuckets();
+ Set<Entry<Integer, BucketRegion>> bucketEntries = dataStore.getAllLocalBuckets();
for (Map.Entry e : bucketEntries) {
- BucketRegion br = (BucketRegion) e.getValue();
- if (br == null || br.isDestroyed) {
+ BucketRegion bucket = (BucketRegion) e.getValue();
+ if (bucket == null || bucket.isDestroyed) {
// bucket region could be destroyed in race condition
continue;
}
- br.getBucketAdvisor().tryLockIfPrimary();
+ bucket.getBucketAdvisor().tryLockIfPrimary();
- // get map <InternalDistriutedMemeber, persistentID> for this bucket's
+ // get map <InternalDistributedMember, persistentID> for this bucket's
// remote members
- bucketMaps[br.getId()] = br.getBucketAdvisor().adviseInitializedPersistentMembers();
+ bucketMaps[bucket.getId()] =
+ bucket.getBucketAdvisor().adviseInitializedPersistentMembers();
if (logger.isDebugEnabled()) {
logger.debug("shutDownAll: PR {}: initialized persistent members for {}:{}",
- pr.getName(), br.getId(), bucketMaps[br.getId()]);
+ partitionedRegion.getName(), bucket.getId(), bucketMaps[bucket.getId()]);
}
}
if (logger.isDebugEnabled()) {
- logger.debug("shutDownAll: All buckets for PR {} are locked.", pr.getName());
+ logger.debug("shutDownAll: All buckets for PR {} are locked.",
+ partitionedRegion.getName());
}
// send lock profile update to other members
- pr.setShutDownAllStatus(PartitionedRegion.PRIMARY_BUCKETS_LOCKED);
- new UpdateAttributesProcessor(pr).distribute(false);
- pr.getRegionAdvisor().waitForProfileStatus(PartitionedRegion.PRIMARY_BUCKETS_LOCKED);
+ partitionedRegion.setShutDownAllStatus(PartitionedRegion.PRIMARY_BUCKETS_LOCKED);
+ new UpdateAttributesProcessor(partitionedRegion).distribute(false);
+ partitionedRegion.getRegionAdvisor()
+ .waitForProfileStatus(PartitionedRegion.PRIMARY_BUCKETS_LOCKED);
if (logger.isDebugEnabled()) {
- logger.debug("shutDownAll: PR {}: all bucketlock profiles received.", pr.getName());
+ logger.debug("shutDownAll: PR {}: all bucketLock profiles received.",
+ partitionedRegion.getName());
}
// if async write, do flush
- if (!pr.getAttributes().isDiskSynchronous()) {
- // several PRs might share the same diskstore, we will only flush once
+ if (!partitionedRegion.getAttributes().isDiskSynchronous()) {
+ // several PRs might share the same diskStore, we will only flush once
// even flush is called several times.
- pr.getDiskStore().forceFlush();
+ partitionedRegion.getDiskStore().forceFlush();
// send flush profile update to other members
- pr.setShutDownAllStatus(PartitionedRegion.DISK_STORE_FLUSHED);
- new UpdateAttributesProcessor(pr).distribute(false);
- pr.getRegionAdvisor().waitForProfileStatus(PartitionedRegion.DISK_STORE_FLUSHED);
+ partitionedRegion.setShutDownAllStatus(PartitionedRegion.DISK_STORE_FLUSHED);
+ new UpdateAttributesProcessor(partitionedRegion).distribute(false);
+ partitionedRegion.getRegionAdvisor()
+ .waitForProfileStatus(PartitionedRegion.DISK_STORE_FLUSHED);
if (logger.isDebugEnabled()) {
- logger.debug("shutDownAll: PR {}: all flush profiles received.", pr.getName());
+ logger.debug("shutDownAll: PR {}: all flush profiles received.",
+ partitionedRegion.getName());
}
} // async write
@@ -1904,41 +1845,43 @@ public class GemFireCacheImpl
// iterate through all the bucketMaps and exclude the items whose
// idm is no longer online
Set<InternalDistributedMember> membersToPersistOfflineEqual =
- pr.getRegionAdvisor().adviseDataStore();
+ partitionedRegion.getRegionAdvisor().adviseDataStore();
for (Map.Entry e : bucketEntries) {
- BucketRegion br = (BucketRegion) e.getValue();
- if (br == null || br.isDestroyed) {
+ BucketRegion bucket = (BucketRegion) e.getValue();
+ if (bucket == null || bucket.isDestroyed) {
// bucket region could be destroyed in race condition
continue;
}
Map<InternalDistributedMember, PersistentMemberID> persistMap =
- getSubMapForLiveMembers(pr, membersToPersistOfflineEqual, bucketMaps[br.getId()]);
+ getSubMapForLiveMembers(membersToPersistOfflineEqual, bucketMaps[bucket.getId()]);
if (persistMap != null) {
- br.getPersistenceAdvisor().persistMembersOfflineAndEqual(persistMap);
+ bucket.getPersistenceAdvisor().persistMembersOfflineAndEqual(persistMap);
if (logger.isDebugEnabled()) {
- logger.debug("shutDownAll: PR {}: pesisting bucket {}:{}", pr.getName(), br.getId(),
- persistMap);
+ logger.debug("shutDownAll: PR {}: persisting bucket {}:{}",
+ partitionedRegion.getName(), bucket.getId(), persistMap);
}
}
}
- // send persited profile update to other members, let all members to persist
+ // send persisted profile update to other members, let all members to persist
// before close the region
- pr.setShutDownAllStatus(PartitionedRegion.OFFLINE_EQUAL_PERSISTED);
- new UpdateAttributesProcessor(pr).distribute(false);
- pr.getRegionAdvisor().waitForProfileStatus(PartitionedRegion.OFFLINE_EQUAL_PERSISTED);
+ partitionedRegion.setShutDownAllStatus(PartitionedRegion.OFFLINE_EQUAL_PERSISTED);
+ new UpdateAttributesProcessor(partitionedRegion).distribute(false);
+ partitionedRegion.getRegionAdvisor()
+ .waitForProfileStatus(PartitionedRegion.OFFLINE_EQUAL_PERSISTED);
if (logger.isDebugEnabled()) {
- logger.debug("shutDownAll: PR {}: all offline_equal profiles received.", pr.getName());
+ logger.debug("shutDownAll: PR {}: all offline_equal profiles received.",
+ partitionedRegion.getName());
}
- } // datastore
+ } // dataStore
- // after done all steps for buckets, close pr
+ // after done all steps for buckets, close partitionedRegion
// close accessor directly
- RegionEventImpl event =
- new RegionEventImpl(pr, Operation.REGION_CLOSE, null, false, getMyId(), true);
+ RegionEventImpl event = new RegionEventImpl(partitionedRegion, Operation.REGION_CLOSE, null,
+ false, getMyId(), true);
try {
// not to acquire lock
- pr.basicDestroyRegion(event, false, false, true);
+ partitionedRegion.basicDestroyRegion(event, false, false, true);
} catch (CacheWriterException e) {
// not possible with local operation, CacheWriter not called
throw new Error(
@@ -1952,36 +1895,33 @@ public class GemFireCacheImpl
.toLocalizedString(),
e);
}
- // pr.close();
} // synchronized
} catch (CacheClosedException cce) {
logger.debug("Encounter CacheClosedException when shutDownAll is closing PR: {}:{}",
- pr.getFullPath(), cce.getMessage());
+ partitionedRegion.getFullPath(), cce.getMessage());
} catch (CancelException ce) {
logger.debug("Encounter CancelException when shutDownAll is closing PR: {}:{}",
- pr.getFullPath(), ce.getMessage());
+ partitionedRegion.getFullPath(), ce.getMessage());
} catch (RegionDestroyedException rde) {
logger.debug("Encounter CacheDestroyedException when shutDownAll is closing PR: {}:{}",
- pr.getFullPath(), rde.getMessage());
+ partitionedRegion.getFullPath(), rde.getMessage());
} finally {
if (acquiredLock) {
- pr.releaseDestroyLock();
+ partitionedRegion.releaseDestroyLock();
}
}
}
- private Map<InternalDistributedMember, PersistentMemberID> getSubMapForLiveMembers(
- PartitionedRegion pr, Set<InternalDistributedMember> membersToPersistOfflineEqual,
+ private static Map<InternalDistributedMember, PersistentMemberID> getSubMapForLiveMembers(
+ Set<InternalDistributedMember> membersToPersistOfflineEqual,
Map<InternalDistributedMember, PersistentMemberID> bucketMap) {
if (bucketMap == null) {
return null;
}
- Map<InternalDistributedMember, PersistentMemberID> persistMap = new HashMap();
- Iterator itor = membersToPersistOfflineEqual.iterator();
- while (itor.hasNext()) {
- InternalDistributedMember idm = (InternalDistributedMember) itor.next();
- if (bucketMap.containsKey(idm)) {
- persistMap.put(idm, bucketMap.get(idm));
+ Map<InternalDistributedMember, PersistentMemberID> persistMap = new HashMap<>();
+ for (InternalDistributedMember member : membersToPersistOfflineEqual) {
+ if (bucketMap.containsKey(member)) {
+ persistMap.put(member, bucketMap.get(member));
}
}
return persistMap;
@@ -1992,13 +1932,13 @@ public class GemFireCacheImpl
close(false);
}
- public void close(String reason, boolean keepalive, boolean keepDS) {
- close(reason, null, keepalive, keepDS);
+ public void close(String reason, boolean keepAlive, boolean keepDS) {
+ close(reason, null, keepAlive, keepDS);
}
@Override
- public void close(boolean keepalive) {
- close("Normal disconnect", null, keepalive, false);
+ public void close(boolean keepAlive) {
+ close("Normal disconnect", null, keepAlive, false);
}
public void close(String reason, Throwable optionalCause) {
@@ -2098,7 +2038,7 @@ public class GemFireCacheImpl
public OffHeapEvictor getOffHeapEvictor() {
synchronized (this.offHeapEvictorLock) {
- stopper.checkCancelInProgress(null);
+ this.stopper.checkCancelInProgress(null);
if (this.offHeapEvictor == null) {
this.offHeapEvictor = new OffHeapEvictor(this);
}
@@ -2108,37 +2048,26 @@ public class GemFireCacheImpl
@Override
public PersistentMemberManager getPersistentMemberManager() {
- return persistentMemberManager;
+ return this.persistentMemberManager;
}
@Override
public ClientMetadataService getClientMetadataService() {
synchronized (this.clientMetaDatServiceLock) {
- stopper.checkCancelInProgress(null);
- if (this.clientMetadatService == null) {
- this.clientMetadatService = new ClientMetadataService(this);
+ this.stopper.checkCancelInProgress(null);
+ if (this.clientMetadataService == null) {
+ this.clientMetadataService = new ClientMetadataService(this);
}
- return this.clientMetadatService;
+ return this.clientMetadataService;
}
}
private final boolean DISABLE_DISCONNECT_DS_ON_CACHE_CLOSE = Boolean
.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "DISABLE_DISCONNECT_DS_ON_CACHE_CLOSE");
- /**
- * close the cache
- *
- * @param reason the reason the cache is being closed
- * @param systemFailureCause whether this member was ejected from the distributed system
- * @param keepalive whoever added this should javadoc it
- */
- public void close(String reason, Throwable systemFailureCause, boolean keepalive) {
- close(reason, systemFailureCause, keepalive, false);
- }
-
- public void close(String reason, Throwable systemFailureCause, boolean keepalive,
+ public void close(String reason, Throwable systemFailureCause, boolean keepAlive,
boolean keepDS) {
- securityService.close();
+ this.securityService.close();
if (isClosed()) {
return;
@@ -2146,7 +2075,7 @@ public class GemFireCacheImpl
final boolean isDebugEnabled = logger.isDebugEnabled();
synchronized (GemFireCacheImpl.class) {
- // bugfix for bug 36512 "GemFireCache.close is not thread safe"
+ // fix for bug 36512 "GemFireCache.close is not thread safe"
// ALL CODE FOR CLOSE SHOULD NOW BE UNDER STATIC SYNCHRONIZATION
// OF synchronized (GemFireCache.class) {
// static synchronization is necessary due to static resources
@@ -2154,14 +2083,14 @@ public class GemFireCacheImpl
return;
}
- /**
+ /*
* First close the ManagementService as it uses a lot of infra which will be closed by
* cache.close()
- **/
+ */
system.handleResourceEvent(ResourceEvent.CACHE_REMOVE, this);
- if (this.listener != null) {
- this.system.removeResourceListener(listener);
- this.listener = null;
+ if (this.resourceEventsListener != null) {
+ this.system.removeResourceListener(resourceEventsListener);
+ this.resourceEventsListener = null;
}
if (systemFailureCause != null) {
@@ -2173,7 +2102,7 @@ public class GemFireCacheImpl
}
}
- this.keepAlive = keepalive;
+ this.keepAlive = keepAlive;
isClosing = true;
logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCache_0_NOW_CLOSING, this));
@@ -2191,12 +2120,12 @@ public class GemFireCacheImpl
TXStateProxy tx = null;
try {
- if (this.txMgr != null) {
- tx = this.txMgr.internalSuspend();
+ if (this.transactionManager != null) {
+ tx = this.transactionManager.internalSuspend();
}
// do this before closing regions
- resourceManager.close();
+ this.resourceManager.close();
try {
this.resourceAdvisor.close();
@@ -2209,11 +2138,10 @@ public class GemFireCacheImpl
// ignore
}
- GatewaySenderAdvisor advisor = null;
for (GatewaySender sender : this.getAllGatewaySenders()) {
try {
sender.stop();
- advisor = ((AbstractGatewaySender) sender).getSenderAdvisor();
+ GatewaySenderAdvisor advisor = ((AbstractGatewaySender) sender).getSenderAdvisor();
if (advisor != null) {
if (isDebugEnabled) {
logger.debug("Stopping the GatewaySender advisor");
@@ -2265,9 +2193,9 @@ public class GemFireCacheImpl
GemFireCacheImpl.pdxInstance = null;
}
- List rootRegionValues = null;
+ List<LocalRegion> rootRegionValues;
synchronized (this.rootRegions) {
- rootRegionValues = new ArrayList(this.rootRegions.values());
+ rootRegionValues = new ArrayList<>(this.rootRegions.values());
}
{
final Operation op;
@@ -2281,8 +2209,7 @@ public class GemFireCacheImpl
LocalRegion prRoot = null;
- for (Iterator itr = rootRegionValues.iterator(); itr.hasNext();) {
- LocalRegion lr = (LocalRegion) itr.next();
+ for (LocalRegion lr : rootRegionValues) {
if (isDebugEnabled) {
logger.debug("{}: processing region {}", this, lr.getFullPath());
}
@@ -2338,7 +2265,7 @@ public class GemFireCacheImpl
LocalizedStrings.GemFireCache_FAILED_TO_GET_THE_CQSERVICE_TO_CLOSE_DURING_CACHE_CLOSE_1));
}
- PoolManager.close(keepalive);
+ PoolManager.close(keepAlive);
if (isDebugEnabled) {
logger.debug("{}: notifying admins of close...", this);
@@ -2360,7 +2287,7 @@ public class GemFireCacheImpl
DM dm = null;
try {
dm = system.getDistributionManager();
- dm.removeMembershipListener(this.txMgr);
+ dm.removeMembershipListener(this.transactionManager);
} catch (CancelException e) {
// dm = null;
}
@@ -2390,7 +2317,7 @@ public class GemFireCacheImpl
// NO MORE Distributed Messaging AFTER THIS POINT!!!!
{
- ClientMetadataService cms = this.clientMetadatService;
+ ClientMetadataService cms = this.clientMetadataService;
if (cms != null) {
cms.close();
}
@@ -2403,20 +2330,6 @@ public class GemFireCacheImpl
// make sure the disk stores get closed
closeDiskStores();
// NO DISTRIBUTED MESSAGING CAN BE DONE HERE!
-
- // okay, we're taking too long to do this stuff, so let's
- // be mean to other processes and skip the rest of the messaging
- // phase
- // [bruce] the following code is unnecessary since someone put the
- // same actions in a finally block
- // if (!this.closed) {
- // this.closed = true;
- // this.txMgr.close();
- // if (GemFireCache.instance == this) {
- // GemFireCache.instance = null;
- // }
- // ((DynamicRegionFactoryImpl)DynamicRegionFactory.get()).close();
- // }
}
// Close the CqService Handle.
@@ -2448,12 +2361,12 @@ public class GemFireCacheImpl
} finally {
// NO DISTRIBUTED MESSAGING CAN BE DONE HERE!
- if (this.txMgr != null) {
- this.txMgr.close();
+ if (this.transactionManager != null) {
+ this.transactionManager.close();
}
((DynamicRegionFactoryImpl) DynamicRegionFactory.get()).close();
- if (this.txMgr != null) {
- this.txMgr.internalResume(tx);
+ if (this.transactionManager != null) {
+ this.transactionManager.internalResume(tx);
}
TXCommitMessage.getTracker().clearForCacheClose();
}
@@ -2470,8 +2383,7 @@ public class GemFireCacheImpl
// do this late to prevent 43412
TypeRegistry.setPdxSerializer(null);
- for (Iterator iter = cacheLifecycleListeners.iterator(); iter.hasNext();) {
- CacheLifecycleListener listener = (CacheLifecycleListener) iter.next();
+ for (CacheLifecycleListener listener : cacheLifecycleListeners) {
listener.cacheClosed(this);
}
// Fix for #49856
@@ -2482,13 +2394,11 @@ public class GemFireCacheImpl
}
- // see Cache.isReconnecting()
@Override
public boolean isReconnecting() {
return this.system.isReconnecting();
}
- // see Cache.waitUntilReconnected(long, TimeUnit)
@Override
public boolean waitUntilReconnected(long time, TimeUnit units) throws InterruptedException {
boolean systemReconnected = this.system.waitUntilReconnected(time, units);
@@ -2496,10 +2406,7 @@ public class GemFireCacheImpl
return false;
}
GemFireCacheImpl cache = getInstance();
- if (cache == null || !cache.isInitialized()) {
- return false;
- }
- return true;
+ return cache != null && cache.isInitialized();
}
@Override
@@ -2509,14 +2416,14 @@ public class GemFireCacheImpl
@Override
public Cache getReconnectedCache() {
- GemFireCacheImpl c = GemFireCacheImpl.getInstance();
- if (c == null) {
+ GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ if (cache == null) {
return null;
}
- if (c == this || !c.isInitialized()) {
- c = null;
+ if (cache == this || !cache.isInitialized()) {
+ cache = null;
}
- return c;
+ return cache;
}
private void stopMemcachedServer() {
@@ -2544,16 +2451,16 @@ public class GemFireCacheImpl
private void prepareDiskStoresForClose() {
String pdxDSName = TypeRegistry.getPdxDiskStoreName(this);
- DiskStoreImpl pdxdsi = null;
+ DiskStoreImpl pdxDiskStore = null;
for (DiskStoreImpl dsi : this.diskStores.values()) {
if (dsi.getName().equals(pdxDSName)) {
- pdxdsi = dsi;
+ pdxDiskStore = dsi;
} else {
dsi.prepareForClose();
}
}
- if (pdxdsi != null) {
- pdxdsi.prepareForClose();
+ if (pdxDiskStore != null) {
+ pdxDiskStore.prepareForClose();
}
}
@@ -2561,48 +2468,33 @@ public class GemFireCacheImpl
* Used to guard access to compactorPool and set to true when cache is shutdown.
*/
private final AtomicBoolean diskStoreTaskSync = new AtomicBoolean(false);
+
/**
- * Lazily initialized.
+ * Lazily initialized. TODO: this is always null
*/
private ThreadPoolExecutor diskStoreTaskPool = null;
- private void createDiskStoreTaskPool() {
- int MAXT = DiskStoreImpl.MAX_CONCURRENT_COMPACTIONS;
- final ThreadGroup compactThreadGroup =
- LoggingThreadGroup.createThreadGroup("Oplog Compactor Thread Group", logger);
- /*
- * final ThreadFactory compactThreadFactory = new ThreadFactory() { public Thread
- * newThread(Runnable command) { Thread thread = new Thread(compactThreadGroup, command,
- * "Idle OplogCompactor"); thread.setDaemon(true); return thread; } };
- */
+ private final ConcurrentMap<String, DiskStoreImpl> diskStores = new ConcurrentHashMap<>();
- final ThreadFactory compactThreadFactory =
- GemfireCacheHelper.CreateThreadFactory(compactThreadGroup, "Idle OplogCompactor");
- this.diskStoreTaskPool = new ThreadPoolExecutor(MAXT, MAXT, 1, TimeUnit.SECONDS,
- new LinkedBlockingQueue(), compactThreadFactory);
- }
-
- private final ConcurrentMap<String, DiskStoreImpl> diskStores =
- new ConcurrentHashMap<String, DiskStoreImpl>();
private final ConcurrentMap<String, DiskStoreImpl> regionOwnedDiskStores =
- new ConcurrentHashMap<String, DiskStoreImpl>();
+ new ConcurrentHashMap<>();
- public void addDiskStore(DiskStoreImpl dsi) {
+ void addDiskStore(DiskStoreImpl dsi) {
this.diskStores.put(dsi.getName(), dsi);
if (!dsi.isOffline()) {
getDiskStoreMonitor().addDiskStore(dsi);
}
}
- public void removeDiskStore(DiskStoreImpl dsi) {
+ void removeDiskStore(DiskStoreImpl dsi) {
this.diskStores.remove(dsi.getName());
this.regionOwnedDiskStores.remove(dsi.getName());
- /** Added for M&M **/
+ // Added for M&M
if (!dsi.getOwnedByRegion())
system.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, dsi);
}
- public void addRegionOwnedDiskStore(DiskStoreImpl dsi) {
+ void addRegionOwnedDiskStore(DiskStoreImpl dsi) {
this.regionOwnedDiskStores.put(dsi.getName(), dsi);
if (!dsi.isOffline()) {
getDiskStoreMonitor().addDiskStore(dsi);
@@ -2618,7 +2510,7 @@ public class GemFireCacheImpl
logger.debug("closing {}", dsi);
}
dsi.close();
- /** Added for M&M **/
+ // Added for M&M
system.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, dsi);
} catch (Exception e) {
logger.fatal(
@@ -2635,13 +2527,6 @@ public class GemFireCacheImpl
DEFAULT_DS_NAME = dsName;
}
- /**
- * Used by unit tests to undo a change to the default disk store name.
- */
- public static void unsetDefaultDiskStoreName() {
- DEFAULT_DS_NAME = DiskStoreFactory.DEFAULT_DISK_STORE_NAME;
- }
-
public static String getDefaultDiskStoreName() {
return DEFAULT_DS_NAME;
}
@@ -2687,138 +2572,49 @@ public class GemFireCacheImpl
@Override
public Collection<DiskStoreImpl> listDiskStoresIncludingRegionO
<TRUNCATED>