You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/07/12 20:01:06 UTC
[7/8] incubator-geode git commit: GEODE-420: Clean up of
SocketCreator code in tests. SocketCreatorFactory currently singleton,
to amend at later stage
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
index a672127..5e2ce3f 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
@@ -17,7 +17,43 @@
package com.gemstone.gemfire.distributed.internal;
-import com.gemstone.gemfire.*;
+import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Reader;
+import java.lang.reflect.Array;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.StringTokenizer;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.ForcedDisconnectException;
+import com.gemstone.gemfire.GemFireConfigException;
+import com.gemstone.gemfire.GemFireIOException;
+import com.gemstone.gemfire.LogWriter;
+import com.gemstone.gemfire.StatisticDescriptor;
+import com.gemstone.gemfire.Statistics;
+import com.gemstone.gemfire.StatisticsType;
+import com.gemstone.gemfire.StatisticsTypeFactory;
+import com.gemstone.gemfire.SystemConnectException;
+import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.admin.AlertLevel;
import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.CacheFactory;
@@ -34,7 +70,19 @@ import com.gemstone.gemfire.distributed.internal.membership.QuorumChecker;
import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager;
import com.gemstone.gemfire.i18n.LogWriterI18n;
-import com.gemstone.gemfire.internal.*;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.DSFIDFactory;
+import com.gemstone.gemfire.internal.DummyStatisticsImpl;
+import com.gemstone.gemfire.internal.GemFireStatSampler;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
+import com.gemstone.gemfire.internal.InternalInstantiator;
+import com.gemstone.gemfire.internal.LinuxProcFsStatistics;
+import com.gemstone.gemfire.internal.LocalStatisticsImpl;
+import com.gemstone.gemfire.internal.OsStatisticsFactory;
+import com.gemstone.gemfire.internal.StatisticsImpl;
+import com.gemstone.gemfire.internal.StatisticsManager;
+import com.gemstone.gemfire.internal.StatisticsTypeFactoryImpl;
+import com.gemstone.gemfire.internal.SystemTimer;
import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
import com.gemstone.gemfire.internal.cache.CacheConfig;
import com.gemstone.gemfire.internal.cache.CacheServerImpl;
@@ -53,6 +101,7 @@ import com.gemstone.gemfire.internal.logging.log4j.AlertAppender;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogWriterAppender;
import com.gemstone.gemfire.internal.logging.log4j.LogWriterAppenders;
+import com.gemstone.gemfire.internal.net.SocketCreatorFactory;
import com.gemstone.gemfire.internal.offheap.MemoryAllocator;
import com.gemstone.gemfire.internal.offheap.OffHeapStorage;
import com.gemstone.gemfire.internal.tcp.ConnectionTable;
@@ -60,37 +109,18 @@ import com.gemstone.gemfire.internal.util.concurrent.StoppableCondition;
import com.gemstone.gemfire.internal.util.concurrent.StoppableReentrantLock;
import com.gemstone.gemfire.management.ManagementException;
import com.gemstone.gemfire.security.GemFireSecurityException;
-import org.apache.logging.log4j.Logger;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Reader;
-import java.lang.reflect.Array;
-import java.net.InetAddress;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.gemstone.gemfire.distributed.ConfigurationProperties.LOCATORS;
-import static com.gemstone.gemfire.distributed.ConfigurationProperties.MCAST_PORT;
/**
* The concrete implementation of {@link DistributedSystem} that
* provides internal-only functionality.
- *
* @since GemFire 3.0
- *
*/
-public class InternalDistributedSystem
- extends DistributedSystem
- implements OsStatisticsFactory, StatisticsManager
-{
+public class InternalDistributedSystem extends DistributedSystem implements OsStatisticsFactory, StatisticsManager {
+
private static final Logger logger = LogService.getLogger();
public static final String DISABLE_MANAGEMENT_PROPERTY = DistributionConfig.GEMFIRE_PREFIX + "disableManagement";
-
+
/**
* If auto-reconnect is going on this will hold a reference to it
*/
@@ -105,46 +135,60 @@ public class InternalDistributedSystem
// the following is overridden from DistributedTestCase to fix #51058
public static final AtomicReference<CreationStackGenerator> TEST_CREATION_STACK_GENERATOR = new AtomicReference<CreationStackGenerator>(DEFAULT_CREATION_STACK_GENERATOR);
-
- /** The distribution manager that is used to communicate with the
- * distributed system. */
+
+ /**
+ * The distribution manager that is used to communicate with the
+ * distributed system.
+ */
protected DM dm;
private final GrantorRequestProcessor.GrantorRequestContext grc;
+
public GrantorRequestProcessor.GrantorRequestContext getGrantorRequestContext() {
return grc;
}
-
- /** Numeric id that identifies this node in a DistributedSystem */
+
+ /**
+ * Numeric id that identifies this node in a DistributedSystem
+ */
private long id;
- /** The log writer used to log information messages */
+ /**
+ * The log writer used to log information messages
+ */
protected InternalLogWriter logWriter = null;
- /** The log writer used to log security related messages */
+ /**
+ * The log writer used to log security related messages
+ */
protected InternalLogWriter securityLogWriter = null;
-
- /** Distributed System clock */
+
+ /**
+ * Distributed System clock
+ */
private DSClock clock;
-// /** The log writer was provided by an external entity */
-// private boolean externalLogWriterProvided = false;
-//
+ // /** The log writer was provided by an external entity */
+ // private boolean externalLogWriterProvided = false;
+ //
private LogWriterAppender logWriterAppender = null;
private LogWriterAppender securityLogWriterAppender = null;
-
- /** Time this system was created */
+
+ /**
+ * Time this system was created
+ */
private final long startTime;
/**
* Guards access to {@link #isConnected}
*/
protected final Object isConnectedMutex = new Object();
-
- /** Is this <code>DistributedSystem</code> connected to a
- * distributed system?
- *
+
+ /**
+ * Is this <code>DistributedSystem</code> connected to a
+ * distributed system?
+ * <p>
* Concurrency: volatile for reads and protected by synchronization of {@link #isConnectedMutex} for writes
*/
protected volatile boolean isConnected;
@@ -155,28 +199,35 @@ public class InternalDistributedSystem
*/
private boolean isLoner = false;
- /** The sampler for this DistributedSystem.
+ /**
+ * The sampler for this DistributedSystem.
*/
private GemFireStatSampler sampler = null;
- /** A set of listeners that are invoked when this connection to the
- * distributed system is disconnected */
+ /**
+ * A set of listeners that are invoked when this connection to the
+ * distributed system is disconnected
+ */
private final Set listeners = new LinkedHashSet(); // needs to be ordered
- /** Set of listeners that are invoked whenever a connection is created to
- * the distributed system */
+ /**
+ * Set of listeners that are invoked whenever a connection is created to
+ * the distributed system
+ */
private static Set connectListeners = new LinkedHashSet(); // needs to be ordered
-
- /** auto-reconnect listeners */
+
+ /**
+ * auto-reconnect listeners
+ */
private static List<ReconnectListener> reconnectListeners = new ArrayList<ReconnectListener>();
-
+
/**
- * whether this DS is one created to reconnect to the distributed
+ * whether this DS is one created to reconnect to the distributed
* system after a forced-disconnect. This state is cleared once reconnect
* is successful.
*/
private boolean isReconnectingDS;
-
+
/**
* During a reconnect attempt this is used to perform quorum checks
* before allowing a location service to be started up in this JVM.
@@ -185,6 +236,7 @@ public class InternalDistributedSystem
*/
private QuorumChecker quorumChecker;
+
/**
* A Constant that matches the ThreadGroup name of the shutdown hook.
* This constant is used to insure consistency with LoggingThreadGroup.
@@ -204,28 +256,34 @@ public class InternalDistributedSystem
//////////////////// Configuration Fields ////////////////////
- /** The config object used to create this distributed system */
+ /**
+ * The config object used to create this distributed system
+ */
private final DistributionConfig originalConfig;
- /** The config object to which most configuration work is delegated */
+ /**
+ * The config object to which most configuration work is delegated
+ */
private DistributionConfig config;
private final boolean statsDisabled = Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "statsDisabled");
private volatile boolean shareSockets = DistributionConfig.DEFAULT_CONSERVE_SOCKETS;
- /** if this distributed system starts a locator, it is stored here */
+ /**
+ * if this distributed system starts a locator, it is stored here
+ */
private InternalLocator startedLocator;
-
+
private List<ResourceEventsListener> resourceListeners;
-
+
private final boolean disableManagement = Boolean.getBoolean(DISABLE_MANAGEMENT_PROPERTY);
/**
* Stack trace showing the creation of this instance of InternalDistributedSystem.
*/
private final Throwable creationStack;
-
+
///////////////////// Static Methods /////////////////////
/**
@@ -237,12 +295,12 @@ public class InternalDistributedSystem
InternalDataSerializer.checkSerializationVersion();
try {
SystemFailure.startThreads();
- InternalDistributedSystem newSystem = new InternalDistributedSystem(config);
- newSystem.initialize();
- reconnectAttemptCounter = 0; // reset reconnect count since we just got a new connection
- notifyConnectListeners(newSystem);
- success = true;
- return newSystem;
+ InternalDistributedSystem newSystem = new InternalDistributedSystem(config);
+ newSystem.initialize();
+ reconnectAttemptCounter = 0; // reset reconnect count since we just got a new connection
+ notifyConnectListeners(newSystem);
+ success = true;
+ return newSystem;
} finally {
if (!success) {
LoggingThreadGroup.cleanUpThreadGroups(); // bug44365 - logwriters accumulate, causing mem leak
@@ -250,8 +308,8 @@ public class InternalDistributedSystem
}
}
}
-
-
+
+
/**
* creates a non-functional instance for testing
* @param nonDefault - non-default distributed system properties
@@ -268,7 +326,6 @@ public class InternalDistributedSystem
* Returns a connection to the distributed system that is suitable
* for administration. For administration, we are not as strict
* when it comes to existing connections.
- *
* @since GemFire 4.0
*/
public static DistributedSystem connectForAdmin(Properties props) {
@@ -279,7 +336,7 @@ public class InternalDistributedSystem
* Returns a connected distributed system for this VM, or null
* if there is no connected distributed system in this VM.
* This method synchronizes on the existingSystems collection.
- *
+ * <p>
* <p>author bruce
* @since GemFire 5.0
*/
@@ -287,10 +344,10 @@ public class InternalDistributedSystem
InternalDistributedSystem result = null;
synchronized (existingSystemsLock) {
if (!existingSystems.isEmpty()) {
- InternalDistributedSystem existingSystem =
- (InternalDistributedSystem) existingSystems.get(0);
- if (existingSystem.isConnected())
+ InternalDistributedSystem existingSystem = (InternalDistributedSystem) existingSystems.get(0);
+ if (existingSystem.isConnected()) {
result = existingSystem;
+ }
}
}
return result;
@@ -333,7 +390,7 @@ public class InternalDistributedSystem
}
return null;
}
-
+
public static InternalLogWriter getStaticInternalLogWriter() {
InternalDistributedSystem sys = getAnyInstance();
if (sys != null) {
@@ -341,11 +398,11 @@ public class InternalDistributedSystem
}
return null;
}
-
+
public InternalLogWriter getInternalLogWriter() {
return this.logWriter;
}
-
+
public static InternalLogWriter getStaticSecurityInternalLogWriter() {
InternalDistributedSystem sys = getAnyInstance();
if (sys != null) {
@@ -353,7 +410,7 @@ public class InternalDistributedSystem
}
return null;
}
-
+
public InternalLogWriter getSecurityInternalLogWriter() {
InternalDistributedSystem sys = getAnyInstance();
if (sys != null) {
@@ -361,8 +418,10 @@ public class InternalDistributedSystem
}
return null;
}
-
- /** reset the reconnectAttempt counter for a new go at reconnecting */
+
+ /**
+ * reset the reconnectAttempt counter for a new go at reconnecting
+ */
private static void resetReconnectAttemptCounter() {
reconnectAttemptCounter = 0;
}
@@ -376,10 +435,8 @@ public class InternalDistributedSystem
* the "default" values of properties. See {@link
* DistributedSystem#connect} for a list of exceptions that may be
* thrown.
- *
- * @param nonDefault
- * The non-default configuration properties specified by the
- * caller
+ * @param nonDefault The non-default configuration properties specified by the
+ * caller
*
* @see DistributedSystem#connect
*/
@@ -392,14 +449,14 @@ public class InternalDistributedSystem
Object o = nonDefault.remove(DistributionConfig.DS_RECONNECTING_NAME);
if (o instanceof Boolean) {
- this.isReconnectingDS = ((Boolean)o).booleanValue();
+ this.isReconnectingDS = ((Boolean) o).booleanValue();
} else {
this.isReconnectingDS = false;
}
-
+
o = nonDefault.remove(DistributionConfig.DS_QUORUM_CHECKER_NAME);
if (o instanceof QuorumChecker) {
- this.quorumChecker = (QuorumChecker)o;
+ this.quorumChecker = (QuorumChecker) o;
}
o = nonDefault.remove(DistributionConfig.DS_CONFIG_NAME);
@@ -408,46 +465,42 @@ public class InternalDistributedSystem
} else {
this.originalConfig = new DistributionConfigImpl(nonDefault);
}
-
- ((DistributionConfigImpl)this.originalConfig).checkForDisallowedDefaults(); // throws IllegalStateEx
+
+ ((DistributionConfigImpl) this.originalConfig).checkForDisallowedDefaults(); // throws IllegalStateEx
this.shareSockets = this.originalConfig.getConserveSockets();
this.startTime = System.currentTimeMillis();
this.grc = new GrantorRequestProcessor.GrantorRequestContext(stopper);
-
+
elderLock = new StoppableReentrantLock(stopper);
elderLockCondition = elderLock.newCondition();
this.creationStack = TEST_CREATION_STACK_GENERATOR.get().generateCreationStack(this.originalConfig);
-// if (DistributionConfigImpl.multicastTest) {
-// this.logger.warning("Use of multicast has been forced");
-// }
-// if (DistributionConfigImpl.forceDisableTcp) {
-// this.logger.warning("Use of UDP has been forced");
-// }
-// if (com.gemstone.gemfire.distributed.internal.membership.jgroup.JGroupMembershipManager.multicastTest) {
-// this.logger.warning("Use of multicast for all distributed cache operations has been forced");
-// }
+ // if (DistributionConfigImpl.multicastTest) {
+ // this.logger.warning("Use of multicast has been forced");
+ // }
+ // if (DistributionConfigImpl.forceDisableTcp) {
+ // this.logger.warning("Use of UDP has been forced");
+ // }
+ // if (com.gemstone.gemfire.distributed.internal.membership.jgroup.JGroupMembershipManager.multicastTest) {
+ // this.logger.warning("Use of multicast for all distributed cache operations has been forced");
+ // }
}
//////////////////// Instance Methods ////////////////////
-
+
/**
* Registers a listener to the system
- *
- * @param listener
- * listener to be added
+ * @param listener listener to be added
*/
public void addResourceListener(ResourceEventsListener listener) {
resourceListeners.add(listener);
}
-
+
/**
* Un-Registers a listener to the system
- *
- * @param listener
- * listener to be removed
+ * @param listener listener to be removed
*/
public void removeResourceListener(ResourceEventsListener listener) {
resourceListeners.remove(listener);
@@ -462,11 +515,8 @@ public class InternalDistributedSystem
/**
* Handles a particular event associated with a resource
- *
- * @param event
- * Resource event
- * @param resource
- * resource on which event is generated
+ * @param event Resource event
+ * @param resource resource on which event is generated
*/
public void handleResourceEvent(ResourceEvent event, Object resource) {
if (disableManagement) {
@@ -478,17 +528,19 @@ public class InternalDistributedSystem
notifyResourceEventListeners(event, resource);
}
- /** Returns true if system is a loner (for testing) */
+ /**
+ * Returns true if system is a loner (for testing)
+ */
public boolean isLoner() {
return this.isLoner;
}
private MemoryAllocator offHeapStore = null;
-
+
public MemoryAllocator getOffHeapStore() {
return this.offHeapStore;
}
-
+
/**
* Initializes this connection to a distributed system with the
* current configuration state.
@@ -496,11 +548,7 @@ public class InternalDistributedSystem
private void initialize() {
if (this.originalConfig.getLocators().equals("")) {
if (this.originalConfig.getMcastPort() != 0) {
- throw new GemFireConfigException("The "
- + LOCATORS
- + " attribute can not be empty when the "
- + MCAST_PORT
- + " attribute is non-zero.");
+ throw new GemFireConfigException("The " + LOCATORS + " attribute can not be empty when the " + MCAST_PORT + " attribute is non-zero.");
} else {
// no distribution
this.isLoner = true;
@@ -512,180 +560,172 @@ public class InternalDistributedSystem
this.attemptingToReconnect = (reconnectAttemptCounter > 0);
}
try {
- SocketCreator.getDefaultInstance(this.config);
-
- // LOG: create LogWriterAppender(s) if log-file or security-log-file is specified
- final boolean hasLogFile = this.config.getLogFile() != null && !this.config.getLogFile().equals(new File(""));
- final boolean hasSecurityLogFile = this.config.getSecurityLogFile() != null && !this.config.getSecurityLogFile().equals(new File(""));
- LogService.configureLoggers(hasLogFile, hasSecurityLogFile);
- if (hasLogFile || hasSecurityLogFile) {
-
- // main log file
- if (hasLogFile) {
- // if log-file then create logWriterAppender
- this.logWriterAppender = LogWriterAppenders.getOrCreateAppender(LogWriterAppenders.Identifier.MAIN, this.isLoner, this.config, true);
- }
-
- // security log file
- if (hasSecurityLogFile) {
- // if security-log-file then create securityLogWriterAppender
- this.securityLogWriterAppender = LogWriterAppenders.getOrCreateAppender(LogWriterAppenders.Identifier.SECURITY, this.isLoner, this.config, false);
- } else {
- // let security route to regular log-file or stdout
- }
- }
-
- // LOG: create LogWriterLogger(s) for backwards compatibility of getLogWriter and getSecurityLogWriter
- if (this.logWriter == null) {
- this.logWriter = LogWriterFactory.createLogWriterLogger(this.isLoner, false, this.config, true);
- this.logWriter.fine("LogWriter is created.");
- }
-
-// logWriter.info("Created log writer for IDS@"+System.identityHashCode(this));
-
- if (this.securityLogWriter == null) {
- // LOG: whole new LogWriterLogger instance for security
- this.securityLogWriter = LogWriterFactory.createLogWriterLogger(this.isLoner, true, this.config, false);
- this.securityLogWriter.fine("SecurityLogWriter is created.");
- }
-
- Services.setLogWriter(this.logWriter);
- Services.setSecurityLogWriter(this.securityLogWriter);
-
- this.clock = new DSClock(this.isLoner);
-
- if (this.attemptingToReconnect && logger.isDebugEnabled()) {
- logger.debug("This thread is initializing a new DistributedSystem in order to reconnect to other members");
- }
- // Note we need loners to load the license in case they are a
- // bridge server and will need to enforce the member limit
- if (Boolean.getBoolean(InternalLocator.FORCE_LOCATOR_DM_TYPE)) {
- this.locatorDMTypeForced = true;
- }
+ SocketCreatorFactory.setDistributionConfig(config);
+
+ // LOG: create LogWriterAppender(s) if log-file or security-log-file is specified
+ final boolean hasLogFile = this.config.getLogFile() != null && !this.config.getLogFile().equals(new File(""));
+ final boolean hasSecurityLogFile = this.config.getSecurityLogFile() != null && !this.config.getSecurityLogFile().equals(new File(""));
+ LogService.configureLoggers(hasLogFile, hasSecurityLogFile);
+ if (hasLogFile || hasSecurityLogFile) {
+
+ // main log file
+ if (hasLogFile) {
+ // if log-file then create logWriterAppender
+ this.logWriterAppender = LogWriterAppenders.getOrCreateAppender(LogWriterAppenders.Identifier.MAIN, this.isLoner, this.config, true);
+ }
- // Initialize the Diffie-Hellman and public/private keys
- try {
- HandShake.initCertsMap(this.config.getSecurityProps());
- HandShake.initPrivateKey(this.config.getSecurityProps());
- HandShake.initDHKeys(this.config);
- }
- catch (Exception ex) {
- throw new GemFireSecurityException(
- LocalizedStrings.InternalDistributedSystem_PROBLEM_IN_INITIALIZING_KEYS_FOR_CLIENT_AUTHENTICATION.toLocalizedString(), ex);
- }
-
- final long offHeapMemorySize = OffHeapStorage.parseOffHeapMemorySize(getConfig().getOffHeapMemorySize());
-
- this.offHeapStore = OffHeapStorage.createOffHeapStorage(this, offHeapMemorySize, this);
-
- // Note: this can only happen on a linux system
- if (getConfig().getLockMemory()) {
- // This calculation is not exact, but seems fairly close. So far we have
- // not loaded much into the heap and the current RSS usage is already
- // included the available memory calculation.
- long avail = LinuxProcFsStatistics.getAvailableMemory(logger);
- long size = offHeapMemorySize + Runtime.getRuntime().totalMemory();
- if (avail < size) {
- if (GemFireCacheImpl.ALLOW_MEMORY_LOCK_WHEN_OVERCOMMITTED) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.InternalDistributedSystem_MEMORY_OVERCOMMIT_WARN, size - avail));
+ // security log file
+ if (hasSecurityLogFile) {
+ // if security-log-file then create securityLogWriterAppender
+ this.securityLogWriterAppender = LogWriterAppenders.getOrCreateAppender(LogWriterAppenders.Identifier.SECURITY, this.isLoner, this.config, false);
} else {
- throw new IllegalStateException(LocalizedStrings.InternalDistributedSystem_MEMORY_OVERCOMMIT.toLocalizedString(avail, size));
+ // let security route to regular log-file or stdout
}
}
-
- logger.info("Locking memory. This may take a while...");
- GemFireCacheImpl.lockMemory();
- logger.info("Finished locking memory.");
- }
- try {
- startInitLocator();
- } catch (InterruptedException e) {
- throw new SystemConnectException("Startup has been interrupted", e);
- }
+ // LOG: create LogWriterLogger(s) for backwards compatibility of getLogWriter and getSecurityLogWriter
+ if (this.logWriter == null) {
+ this.logWriter = LogWriterFactory.createLogWriterLogger(this.isLoner, false, this.config, true);
+ this.logWriter.fine("LogWriter is created.");
+ }
- synchronized (this.isConnectedMutex) {
- this.isConnected = true;
- }
-
- if (!this.isLoner) {
- try {
- if (this.quorumChecker != null) {
- this.quorumChecker.suspend();
- }
- this.dm = DistributionManager.create(this);
- // fix bug #46324
- if (InternalLocator.hasLocator()) {
- InternalLocator locator = InternalLocator.getLocator();
- getDistributionManager().addHostedLocators(getDistributedMember(), InternalLocator.getLocatorStrings(), locator.isSharedConfigurationEnabled());
- }
+ // logWriter.info("Created log writer for IDS@"+System.identityHashCode(this));
+
+ if (this.securityLogWriter == null) {
+ // LOG: whole new LogWriterLogger instance for security
+ this.securityLogWriter = LogWriterFactory.createLogWriterLogger(this.isLoner, true, this.config, false);
+ this.securityLogWriter.fine("SecurityLogWriter is created.");
}
- finally {
- if (this.dm == null && this.quorumChecker != null) {
- this.quorumChecker.resume();
- }
- setDisconnected();
+
+ Services.setLogWriter(this.logWriter);
+ Services.setSecurityLogWriter(this.securityLogWriter);
+
+ this.clock = new DSClock(this.isLoner);
+
+ if (this.attemptingToReconnect && logger.isDebugEnabled()) {
+ logger.debug("This thread is initializing a new DistributedSystem in order to reconnect to other members");
+ }
+ // Note we need loners to load the license in case they are a
+ // bridge server and will need to enforce the member limit
+ if (Boolean.getBoolean(InternalLocator.FORCE_LOCATOR_DM_TYPE)) {
+ this.locatorDMTypeForced = true;
}
- }
- else {
- this.dm = new LonerDistributionManager(this, this.logWriter);
- }
-
- Assert.assertTrue(this.dm != null);
- Assert.assertTrue(this.dm.getSystem() == this);
- try {
- this.id = this.dm.getChannelId();
- } catch (DistributedSystemDisconnectedException e) {
- // bug #48144 - The dm's channel threw an NPE. It now throws this exception
- // but during startup we should instead throw a SystemConnectException
- throw new SystemConnectException(
- LocalizedStrings.InternalDistributedSystem_DISTRIBUTED_SYSTEM_HAS_DISCONNECTED
- .toLocalizedString(), e);
- }
+ // Initialize the Diffie-Hellman and public/private keys
+ try {
+ HandShake.initCertsMap(this.config.getSecurityProps());
+ HandShake.initPrivateKey(this.config.getSecurityProps());
+ HandShake.initDHKeys(this.config);
+ } catch (Exception ex) {
+ throw new GemFireSecurityException(LocalizedStrings.InternalDistributedSystem_PROBLEM_IN_INITIALIZING_KEYS_FOR_CLIENT_AUTHENTICATION.toLocalizedString(), ex);
+ }
+
+ final long offHeapMemorySize = OffHeapStorage.parseOffHeapMemorySize(getConfig().getOffHeapMemorySize());
+
+ this.offHeapStore = OffHeapStorage.createOffHeapStorage(this, offHeapMemorySize, this);
+
+ // Note: this can only happen on a linux system
+ if (getConfig().getLockMemory()) {
+ // This calculation is not exact, but seems fairly close. So far we have
+ // not loaded much into the heap and the current RSS usage is already
+ // included the available memory calculation.
+ long avail = LinuxProcFsStatistics.getAvailableMemory(logger);
+ long size = offHeapMemorySize + Runtime.getRuntime().totalMemory();
+ if (avail < size) {
+ if (GemFireCacheImpl.ALLOW_MEMORY_LOCK_WHEN_OVERCOMMITTED) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.InternalDistributedSystem_MEMORY_OVERCOMMIT_WARN, size - avail));
+ } else {
+ throw new IllegalStateException(LocalizedStrings.InternalDistributedSystem_MEMORY_OVERCOMMIT.toLocalizedString(avail, size));
+ }
+ }
+
+ logger.info("Locking memory. This may take a while...");
+ GemFireCacheImpl.lockMemory();
+ logger.info("Finished locking memory.");
+ }
- synchronized (this.isConnectedMutex) {
- this.isConnected = true;
- }
- if (attemptingToReconnect && (this.startedLocator == null)) {
try {
startInitLocator();
} catch (InterruptedException e) {
throw new SystemConnectException("Startup has been interrupted", e);
}
- }
- try {
- endInitLocator();
- }
- catch (IOException e) {
- throw new GemFireIOException("Problem finishing a locator service start", e);
- }
+
+ synchronized (this.isConnectedMutex) {
+ this.isConnected = true;
+ }
+
+ if (!this.isLoner) {
+ try {
+ if (this.quorumChecker != null) {
+ this.quorumChecker.suspend();
+ }
+ this.dm = DistributionManager.create(this);
+ // fix bug #46324
+ if (InternalLocator.hasLocator()) {
+ InternalLocator locator = InternalLocator.getLocator();
+ getDistributionManager().addHostedLocators(getDistributedMember(), InternalLocator.getLocatorStrings(), locator.isSharedConfigurationEnabled());
+ }
+ } finally {
+ if (this.dm == null && this.quorumChecker != null) {
+ this.quorumChecker.resume();
+ }
+ setDisconnected();
+ }
+ } else {
+ this.dm = new LonerDistributionManager(this, this.logWriter);
+ }
+
+ Assert.assertTrue(this.dm != null);
+ Assert.assertTrue(this.dm.getSystem() == this);
+
+ try {
+ this.id = this.dm.getChannelId();
+ } catch (DistributedSystemDisconnectedException e) {
+ // bug #48144 - The dm's channel threw an NPE. It now throws this exception
+ // but during startup we should instead throw a SystemConnectException
+ throw new SystemConnectException(LocalizedStrings.InternalDistributedSystem_DISTRIBUTED_SYSTEM_HAS_DISCONNECTED.toLocalizedString(), e);
+ }
+
+ synchronized (this.isConnectedMutex) {
+ this.isConnected = true;
+ }
+ if (attemptingToReconnect && (this.startedLocator == null)) {
+ try {
+ startInitLocator();
+ } catch (InterruptedException e) {
+ throw new SystemConnectException("Startup has been interrupted", e);
+ }
+ }
+ try {
+ endInitLocator();
+ } catch (IOException e) {
+ throw new GemFireIOException("Problem finishing a locator service start", e);
+ }
if (!statsDisabled) {
// to fix bug 42527 we need a sampler
// even if sampling is not enabled.
- this.sampler = new GemFireStatSampler(this);
- this.sampler.start();
- }
+ this.sampler = new GemFireStatSampler(this);
+ this.sampler.start();
+ }
- if (this.logWriterAppender != null) {
- LogWriterAppenders.startupComplete(LogWriterAppenders.Identifier.MAIN);
- }
- if (this.securityLogWriterAppender != null) {
- LogWriterAppenders.startupComplete(LogWriterAppenders.Identifier.SECURITY);
- }
-
- //this.logger.info("ds created", new RuntimeException("DEBUG: STACK"));
+ if (this.logWriterAppender != null) {
+ LogWriterAppenders.startupComplete(LogWriterAppenders.Identifier.MAIN);
+ }
+ if (this.securityLogWriterAppender != null) {
+ LogWriterAppenders.startupComplete(LogWriterAppenders.Identifier.SECURITY);
+ }
- //Log any instantiators that were registered before the log writer
- //was created
- InternalInstantiator.logInstantiators();
- }
- catch (RuntimeException ex) {
+ //this.logger.info("ds created", new RuntimeException("DEBUG: STACK"));
+
+ //Log any instantiators that were registered before the log writer
+ //was created
+ InternalInstantiator.logInstantiators();
+ } catch (RuntimeException ex) {
this.config.close();
throw ex;
}
-
+
resourceListeners = new CopyOnWriteArrayList<ResourceEventsListener>();
this.reconnected = this.attemptingToReconnect;
this.attemptingToReconnect = false;
@@ -702,7 +742,7 @@ public class InternalDistributedSystem
if (attemptingToReconnect && !this.isConnected) {
if (this.quorumChecker != null) {
logger.info("performing a quorum check to see if location services can be started early");
- if (!quorumChecker.checkForQuorum(3*this.config.getMemberTimeout())) {
+ if (!quorumChecker.checkForQuorum(3 * this.config.getMemberTimeout())) {
logger.info("quorum check failed - not allowing location services to start early");
return;
}
@@ -711,16 +751,10 @@ public class InternalDistributedSystem
}
DistributionLocatorId locId = new DistributionLocatorId(locatorString);
try {
- this.startedLocator = InternalLocator.createLocator(
- locId.getPort(),
- null,
- null,
- this.logWriter, // LOG: this is after IDS has created LogWriterLoggers and Appenders
- this.securityLogWriter, // LOG: this is after IDS has created LogWriterLoggers and Appenders
- locId.getHost(),
- locId.getHostnameForClients(),
- this.originalConfig.toProperties(), false);
- if(locId.isPeerLocator()) {
+ this.startedLocator = InternalLocator.createLocator(locId.getPort(), null, null, this.logWriter, // LOG: this is after IDS has created LogWriterLoggers and Appenders
+ this.securityLogWriter, // LOG: this is after IDS has created LogWriterLoggers and Appenders
+ locId.getHost(), locId.getHostnameForClients(), this.originalConfig.toProperties(), false);
+ if (locId.isPeerLocator()) {
boolean startedPeerLocation = false;
try {
this.startedLocator.startPeerLocation(true);
@@ -732,13 +766,12 @@ public class InternalDistributedSystem
}
}
} catch (IOException e) {
- throw new GemFireIOException( LocalizedStrings.
- InternalDistributedSystem_PROBLEM_STARTING_A_LOCATOR_SERVICE
- .toLocalizedString(), e);
+ throw new GemFireIOException(LocalizedStrings.
+ InternalDistributedSystem_PROBLEM_STARTING_A_LOCATOR_SERVICE.toLocalizedString(), e);
}
}
}
-
+
/**
* @since GemFire 5.7
*/
@@ -746,7 +779,7 @@ public class InternalDistributedSystem
InternalLocator loc = this.startedLocator;
if (loc != null) {
String locatorString = this.originalConfig.getStartLocator();
-// DistributionLocatorId locId = new DistributionLocatorId(locatorString);
+ // DistributionLocatorId locId = new DistributionLocatorId(locatorString);
boolean finished = false;
try {
// during the period when the product is using only paper licenses we always
@@ -755,7 +788,7 @@ public class InternalDistributedSystem
// if(locId.isServerLocator()) {
loc.startServerLocation(this);
// }
-
+
loc.endStartLocator(this);
finished = true;
} finally {
@@ -766,12 +799,15 @@ public class InternalDistributedSystem
}
}
- /** record a locator as a dependent of this distributed system */
+ /**
+ * record a locator as a dependent of this distributed system
+ */
public void setDependentLocator(InternalLocator theLocator) {
this.startedLocator = theLocator;
}
-
- /** Used by DistributionManager to fix bug 33362
+
+ /**
+ * Used by DistributionManager to fix bug 33362
*/
void setDM(DM dm) {
this.dm = dm;
@@ -780,14 +816,12 @@ public class InternalDistributedSystem
/**
* Checks whether or not this connection to a distributed system is
* closed.
- *
- * @throws DistributedSystemDisconnectedException
- * This connection has been {@link #disconnect(boolean, String, boolean) disconnected}
+ * @throws DistributedSystemDisconnectedException This connection has been {@link #disconnect(boolean, String, boolean) disconnected}
*/
private void checkConnected() {
if (!isConnected()) {
- throw new DistributedSystemDisconnectedException(LocalizedStrings.InternalDistributedSystem_THIS_CONNECTION_TO_A_DISTRIBUTED_SYSTEM_HAS_BEEN_DISCONNECTED.toLocalizedString(),
- dm.getRootCause());
+ throw new DistributedSystemDisconnectedException(LocalizedStrings.InternalDistributedSystem_THIS_CONNECTION_TO_A_DISTRIBUTED_SYSTEM_HAS_BEEN_DISCONNECTED.toLocalizedString(), dm
+ .getRootCause());
}
}
@@ -804,11 +838,12 @@ public class InternalDistributedSystem
}
return this.isConnected;
}
-
+
/**
* This class defers to the DM. If we don't have a DM, we're dead.
*/
protected class Stopper extends CancelCriterion {
+
@Override
public String cancelInProgress() {
checkFailure();
@@ -817,7 +852,7 @@ public class InternalDistributedSystem
}
return dm.getCancelCriterion().cancelInProgress();
}
-
+
@Override
public RuntimeException generateCancelledException(Throwable e) {
if (dm == null) {
@@ -826,17 +861,17 @@ public class InternalDistributedSystem
return dm.getCancelCriterion().generateCancelledException(e);
}
}
-
+
/**
* Handles all cancellation queries for this distributed system
*/
private final Stopper stopper = new Stopper();
-
+
@Override
public CancelCriterion getCancelCriterion() {
return stopper;
}
-
+
public boolean isDisconnecting() {
if (this.dm == null) {
return true;
@@ -848,13 +883,13 @@ public class InternalDistributedSystem
return true;
}
return this.isDisconnecting;
- }
+ }
@Override
public LogWriter getLogWriter() {
return this.logWriter;
}
-
+
public DSClock getClock() {
return this.clock;
}
@@ -874,16 +909,19 @@ public class InternalDistributedSystem
return myCache;
}
*/
+
/**
* Returns the stat sampler
*/
public GemFireStatSampler getStatSampler() {
return this.sampler;
}
-
- /** Has this system started the disconnect process? */
+
+ /**
+ * Has this system started the disconnect process?
+ */
protected volatile boolean isDisconnecting = false;
-
+
/**
* Disconnects this VM from the distributed system. Shuts down the
* distribution manager, and if necessary,
@@ -892,7 +930,7 @@ public class InternalDistributedSystem
public void disconnect() {
disconnect(false, LocalizedStrings.InternalDistributedSystem_NORMAL_DISCONNECT.toLocalizedString(), false);
}
-
+
/**
* Disconnects this member from the distributed system when an internal
* error has caused distribution to fail (e.g., this member was shunned)
@@ -913,31 +951,26 @@ public class InternalDistributedSystem
disconnect(false, reason, shunned);
}
}
-
+
/**
* This is how much time, in milliseconds to allow a disconnect listener
* to run before we interrupt it.
*/
- static private final long MAX_DISCONNECT_WAIT =
- Long.getLong("DistributionManager.DISCONNECT_WAIT",
- 10 * 1000).longValue();
+ static private final long MAX_DISCONNECT_WAIT = Long.getLong("DistributionManager.DISCONNECT_WAIT", 10 * 1000).longValue();
/**
* Run a disconnect listener, checking for errors and
* honoring the timeout {@link #MAX_DISCONNECT_WAIT}.
- *
* @param dc the listener to run
*/
- private void runDisconnect(final DisconnectListener dc,
- ThreadGroup tg) {
+ private void runDisconnect(final DisconnectListener dc, ThreadGroup tg) {
// Create a general handler for running the disconnect
Runnable r = new Runnable() {
public void run() {
try {
disconnectListenerThread.set(Boolean.TRUE);
dc.onDisconnect(InternalDistributedSystem.this);
- }
- catch (CancelException e) {
+ } catch (CancelException e) {
if (logger.isDebugEnabled()) {
logger.debug("Disconnect listener <{}> thwarted by cancellation: {}", dc, e, logger.isTraceEnabled() ? e : null);
}
@@ -950,8 +983,7 @@ public class InternalDistributedSystem
try {
t.start();
t.join(MAX_DISCONNECT_WAIT);
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn(LocalizedMessage.create(LocalizedStrings.InternalDistributedSystem_INTERRUPTED_WHILE_PROCESSING_DISCONNECT_LISTENER), e);
}
@@ -963,34 +995,30 @@ public class InternalDistributedSystem
try {
t.join(MAX_DISCONNECT_WAIT);
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
-
+
if (t.isAlive()) {
logger.warn(LocalizedMessage.create(LocalizedStrings.InternalDistributedSystem_DISCONNECT_LISTENER_IGNORED_ITS_INTERRUPT__0, dc));
}
}
-
+
}
-
+
public boolean isDisconnectListenerThread() {
- Boolean disconnectListenerThreadBoolean =
- (Boolean) this.disconnectListenerThread.get();
+ Boolean disconnectListenerThreadBoolean = (Boolean) this.disconnectListenerThread.get();
- return disconnectListenerThreadBoolean != null &&
- disconnectListenerThreadBoolean.booleanValue();
+ return disconnectListenerThreadBoolean != null && disconnectListenerThreadBoolean.booleanValue();
}
-
+
/**
* Run a disconnect listener in the same thread sequence as the reconnect.
* @param dc the listener to run
* @param tg the thread group to run the listener in
*/
- private void runDisconnectForReconnect(final DisconnectListener dc,
- ThreadGroup tg){
+ private void runDisconnectForReconnect(final DisconnectListener dc, ThreadGroup tg) {
try {
dc.onDisconnect(InternalDistributedSystem.this);
} catch (DistributedSystemDisconnectedException e) {
@@ -1003,36 +1031,34 @@ public class InternalDistributedSystem
/**
* A logging thread group for the disconnect and shutdown listeners
*/
- private final ThreadGroup disconnectListenerThreadGroup =
- LoggingThreadGroup.createThreadGroup("Disconnect Listeners");
-
+ private final ThreadGroup disconnectListenerThreadGroup = LoggingThreadGroup.createThreadGroup("Disconnect Listeners");
+
/**
* Disconnect cache, run disconnect listeners.
- *
* @param doReconnect whether a reconnect will be done
* @param reason the reason that the system is disconnecting
- *
+ *
* @return a collection of shutdownListeners
*/
private HashSet doDisconnects(boolean doReconnect, String reason) {
// Make a pass over the disconnect listeners, asking them _politely_
// to clean up.
HashSet shutdownListeners = new HashSet();
- for (;;) {
+ for (; ; ) {
DisconnectListener listener = null;
synchronized (this.listeners) {
Iterator itr = listeners.iterator();
if (!itr.hasNext()) {
break;
}
- listener = (DisconnectListener)itr.next();
+ listener = (DisconnectListener) itr.next();
if (listener instanceof ShutdownListener) {
shutdownListeners.add(listener);
}
itr.remove();
} // synchronized
-
- if (doReconnect){
+
+ if (doReconnect) {
runDisconnectForReconnect(listener, disconnectListenerThreadGroup);
} else {
runDisconnect(listener, disconnectListenerThreadGroup);
@@ -1040,12 +1066,11 @@ public class InternalDistributedSystem
} // for
return shutdownListeners;
}
-
+
/**
* Process the shutdown listeners. It is essential that the DM has been
* shut down before calling this step, to ensure that no new listeners are
* registering.
- *
* @param shutdownListeners shutdown listeners initially registered with us
*/
private void doShutdownListeners(HashSet shutdownListeners) {
@@ -1056,17 +1081,15 @@ public class InternalDistributedSystem
// Process any shutdown listeners we reaped during first pass
Iterator it = shutdownListeners.iterator();
while (it.hasNext()) {
- ShutdownListener s = (ShutdownListener)it.next();
+ ShutdownListener s = (ShutdownListener) it.next();
try {
s.onShutdown(this);
- }
- catch (VirtualMachineError err) {
+ } catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
@@ -1082,18 +1105,20 @@ public class InternalDistributedSystem
// disconnect listeners may have appeared. After messagingDisabled is
// set, no new ones will be created. However, we must process any
// that appeared in the interim.
- for (;;) {
+ for (; ; ) {
// Pluck next listener from the list
DisconnectListener dcListener = null;
ShutdownListener sdListener = null;
synchronized (this.listeners) {
Iterator itr = listeners.iterator();
- if (!itr.hasNext())
+ if (!itr.hasNext()) {
break;
- dcListener = (DisconnectListener)itr.next();
+ }
+ dcListener = (DisconnectListener) itr.next();
itr.remove();
- if (dcListener instanceof ShutdownListener)
- sdListener = (ShutdownListener)dcListener;
+ if (dcListener instanceof ShutdownListener) {
+ sdListener = (ShutdownListener) dcListener;
+ }
}
// Run the disconnect
@@ -1104,14 +1129,12 @@ public class InternalDistributedSystem
try {
// TODO: should we make sure this times out?
sdListener.onShutdown(this);
- }
- catch (VirtualMachineError err) {
+ } catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
@@ -1124,26 +1147,26 @@ public class InternalDistributedSystem
}
} // for
}
-
+
/**
* break any potential circularity in {@link #loadEmergencyClasses()}
*/
private static volatile boolean emergencyClassesLoaded = false;
-
+
/**
* Ensure that the MembershipManager class gets loaded.
- *
* @see SystemFailure#loadEmergencyClasses()
*/
static public void loadEmergencyClasses() {
- if (emergencyClassesLoaded) return;
+ if (emergencyClassesLoaded) {
+ return;
+ }
emergencyClassesLoaded = true;
GMSMembershipManager.loadEmergencyClasses();
}
-
+
/**
* Closes the membership manager
- *
* @see SystemFailure#emergencyClose()
*/
public void emergencyClose() {
@@ -1160,7 +1183,7 @@ public class InternalDistributedSystem
}
}
}
-
+
// Garbage collection
// Leave dm alone; its CancelCriterion will help people die
this.isConnected = false;
@@ -1173,7 +1196,7 @@ public class InternalDistributedSystem
System.err.println("DEBUG: done with InternalDistributedSystem#emergencyClose");
}
}
-
+
private void setDisconnected() {
synchronized (this.isConnectedMutex) {
this.isConnected = false;
@@ -1187,13 +1210,10 @@ public class InternalDistributedSystem
boolean interrupted = Thread.interrupted();
try {
this.isConnectedMutex.wait();
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
interrupted = true;
- getLogWriter().convertToLogWriterI18n().warning(
- LocalizedStrings.InternalDistributedSystem_DISCONNECT_WAIT_INTERRUPTED, e);
- }
- finally {
+ getLogWriter().convertToLogWriterI18n().warning(LocalizedStrings.InternalDistributedSystem_DISCONNECT_WAIT_INTERRUPTED, e);
+ } finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
@@ -1205,27 +1225,22 @@ public class InternalDistributedSystem
/**
* Disconnects this VM from the distributed system. Shuts down the
* distribution manager.
- *
- * @param preparingForReconnect
- * true if called by a reconnect operation
- * @param reason
- * the reason the disconnect is being performed
- * @param keepAlive
- * true if user requested durable subscriptions are to be retained at
- * server.
+ * @param preparingForReconnect true if called by a reconnect operation
+ * @param reason the reason the disconnect is being performed
+ * @param keepAlive true if user requested durable subscriptions are to be retained at
+ * server.
*/
protected void disconnect(boolean preparingForReconnect, String reason, boolean keepAlive) {
- boolean isShutdownHook = (shutdownHook != null)
- && (Thread.currentThread() == shutdownHook);
+ boolean isShutdownHook = (shutdownHook != null) && (Thread.currentThread() == shutdownHook);
if (!preparingForReconnect) {
-// logger.info("disconnecting IDS@"+System.identityHashCode(this));
- synchronized(reconnectListeners) {
+ // logger.info("disconnecting IDS@"+System.identityHashCode(this));
+ synchronized (reconnectListeners) {
reconnectListeners.clear();
}
cancelReconnect();
}
-
+
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
HashSet shutdownListeners = null;
@@ -1244,21 +1259,18 @@ public class InternalDistributedSystem
disconnectListenerThread.set(Boolean.TRUE); // bug #42663 - this must be set while closing the cache
try {
currentCache.close(reason, dm.getRootCause(), keepAlive, true); // fix for 42150
- }
- catch (VirtualMachineError e) {
+ } catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
- }
- catch (Throwable e) {
+ } catch (Throwable e) {
SystemFailure.checkFailure();
// Whenever you catch Error or Throwable, you must also
// check for fatal JVM error (see above). However, there is
logger.warn(LocalizedMessage.create(LocalizedStrings.InternalDistributedSystem_EXCEPTION_TRYING_TO_CLOSE_CACHE), e);
- }
- finally {
+ } finally {
disconnectListenerThread.set(Boolean.FALSE);
}
- }
+ }
// While still holding the lock, make sure this instance is
// marked as shutting down
@@ -1289,7 +1301,7 @@ public class InternalDistributedSystem
if (!isShutdownHook) {
shutdownListeners = doDisconnects(attemptingToReconnect, reason);
}
-
+
if (!this.attemptingToReconnect) {
if (this.logWriterAppender != null) {
LogWriterAppenders.stop(LogWriterAppenders.Identifier.MAIN);
@@ -1299,11 +1311,10 @@ public class InternalDistributedSystem
LogWriterAppenders.stop(LogWriterAppenders.Identifier.SECURITY);
}
}
-
+
AlertAppender.getInstance().shuttingDown();
-
- }
- finally { // be ABSOLUTELY CERTAIN that dm closed
+
+ } finally { // be ABSOLUTELY CERTAIN that dm closed
try {
// Do the bulk of the close...
this.dm.close();
@@ -1327,7 +1338,7 @@ public class InternalDistributedSystem
}
// closing the Aggregate stats
- if(functionServiceStats != null){
+ if (functionServiceStats != null) {
functionServiceStats.close();
}
// closing individual function stats
@@ -1356,23 +1367,21 @@ public class InternalDistributedSystem
LoggingThreadGroup.cleanUpThreadGroups(); // bug35388 - logwriters accumulate, causing mem leak
EventID.unsetDS();
- }
- finally {
+ } finally {
try {
if (getOffHeapStore() != null) {
getOffHeapStore().close();
}
} finally {
- try {
- removeSystem(this);
- // Close the config object
- this.config.close();
- }
- finally {
- // Finally, mark ourselves as disconnected
- setDisconnected();
- SystemFailure.stopThreads();
- }
+ try {
+ removeSystem(this);
+ // Close the config object
+ this.config.close();
+ } finally {
+ // Finally, mark ourselves as disconnected
+ setDisconnected();
+ SystemFailure.stopThreads();
+ }
}
}
}
@@ -1392,7 +1401,7 @@ public class InternalDistributedSystem
public final DM getDM() {
return this.dm;
}
-
+
/**
* If this DistributedSystem is attempting to reconnect to the distributed system
* this will return the quorum checker created by the old MembershipManager for
@@ -1402,7 +1411,7 @@ public class InternalDistributedSystem
public final QuorumChecker getQuorumChecker() {
return this.quorumChecker;
}
-
+
/**
* Returns true if this DS has been attempting to reconnect but
* the attempt has been cancelled.
@@ -1414,7 +1423,6 @@ public class InternalDistributedSystem
/**
* Returns whether or not this distributed system has the same
* configuration as the given set of properties.
- *
* @see DistributedSystem#connect
*/
public boolean sameAs(Properties props, boolean isConnected) {
@@ -1435,7 +1443,6 @@ public class InternalDistributedSystem
* Returns whether or not the given configuration properties refer
* to the same distributed system as this
* <code>InternalDistributedSystem</code> connection.
- *
* @since GemFire 4.0
*/
public boolean sameSystemAs(Properties props) {
@@ -1466,7 +1473,6 @@ public class InternalDistributedSystem
/**
* Canonicalizes a locators string so that they may be compared.
- *
* @since GemFire 4.0
*/
private static String canonicalizeLocators(String locators) {
@@ -1479,8 +1485,7 @@ public class InternalDistributedSystem
String addr = locId.getBindAddress();
if (addr != null && addr.trim().length() > 0) {
canonical.append(addr);
- }
- else {
+ } else {
canonical.append(locId.getHost().getHostAddress());
}
canonical.append("[");
@@ -1501,14 +1506,15 @@ public class InternalDistributedSystem
private final StoppableReentrantLock elderLock;
private final StoppableCondition elderLockCondition;
-
+
public StoppableReentrantLock getElderLock() {
return elderLock;
}
+
public StoppableCondition getElderLockCondition() {
return elderLockCondition;
}
-
+
/**
* Returns the current configuration of this distributed system.
*/
@@ -1520,7 +1526,6 @@ public class InternalDistributedSystem
* Returns the id of this connection to the distributed system.
* This is actually the port of the distribution manager's
* distribution channel.
- *
* @see com.gemstone.gemfire.distributed.internal.DistributionChannel#getId
*/
@Override
@@ -1540,41 +1545,42 @@ public class InternalDistributedSystem
public InternalDistributedMember getDistributedMember() {
return this.dm.getId();
}
+
@Override
public Set<DistributedMember> getAllOtherMembers() {
return dm.getAllOtherMembers();
}
+
@Override
public Set<DistributedMember> getGroupMembers(String group) {
return dm.getGroupMembers(group);
}
-
-
+
@Override
public Set<DistributedMember> findDistributedMembers(InetAddress address) {
Set<InternalDistributedMember> allMembers = dm.getDistributionManagerIdsIncludingAdmin();
Set<DistributedMember> results = new HashSet<DistributedMember>(2);
-
+
//Search through the set of all members
- for(InternalDistributedMember member: allMembers) {
-
+ for (InternalDistributedMember member : allMembers) {
+
Set<InetAddress> equivalentAddresses = dm.getEquivalents(member.getInetAddress());
//Check to see if the passed in address is matches one of the addresses on
//the given member.
- if(address.equals(member.getInetAddress()) || equivalentAddresses.contains(address)) {
+ if (address.equals(member.getInetAddress()) || equivalentAddresses.contains(address)) {
results.add(member);
}
}
-
+
return results;
}
@Override
public DistributedMember findDistributedMember(String name) {
Set<DistributedMember> allMembers = dm.getDistributionManagerIdsIncludingAdmin();
- for(DistributedMember member : allMembers) {
- if(member.getName().equals(name)) {
+ for (DistributedMember member : allMembers) {
+ if (member.getName().equals(name)) {
return member;
}
}
@@ -1587,6 +1593,7 @@ public class InternalDistributedSystem
public DistributionConfig getOriginalConfig() {
return this.originalConfig;
}
+
@Override
public String getName() {
return getOriginalConfig().getName();
@@ -1599,7 +1606,6 @@ public class InternalDistributedSystem
* a canonical instance of <code>DistributedSystem</code> for each
* configuration, we can use the default implementation of
* <code>equals</code>.
- *
* @see #sameAs
*/
@Override
@@ -1660,7 +1666,7 @@ public class InternalDistributedSystem
sb.append(" started at ");
sb.append((new Date(this.startTime)).toString());
-
+
if (!this.isConnected()) {
sb.append(" (closed)");
}
@@ -1672,15 +1678,16 @@ public class InternalDistributedSystem
private int statsListModCount = 0;
private long statsListUniqueId = 1;
private final Object statsListUniqueIdLock = new Object();
-
+
// As the function execution stats can be lot in number, its better to put
// them in a map so that it will be accessible immediately
- private final ConcurrentHashMap<String, FunctionStats> functionExecutionStatsMap = new ConcurrentHashMap<String, FunctionStats>();
+ private final ConcurrentHashMap<String, FunctionStats> functionExecutionStatsMap = new ConcurrentHashMap<String, FunctionStats>();
private FunctionServiceStats functionServiceStats = null;
-
+
public int getStatListModCount() {
return this.statsListModCount;
}
+
public List<Statistics> getStatsList() {
return this.statsList;
}
@@ -1694,7 +1701,7 @@ public class InternalDistributedSystem
}
return result;
}
-
+
@Override
public final Statistics findStatistics(long id) {
List<Statistics> statsList = this.statsList;
@@ -1705,7 +1712,7 @@ public class InternalDistributedSystem
}
throw new RuntimeException(LocalizedStrings.PureStatSampler_COULD_NOT_FIND_STATISTICS_INSTANCE.toLocalizedString());
}
-
+
@Override
public final boolean statisticsExists(long id) {
List<Statistics> statsList = this.statsList;
@@ -1720,19 +1727,22 @@ public class InternalDistributedSystem
@Override
public final Statistics[] getStatistics() {
List<Statistics> statsList = this.statsList;
- return (Statistics[])statsList.toArray(new Statistics[statsList.size()]);
+ return (Statistics[]) statsList.toArray(new Statistics[statsList.size()]);
}
-
+
// StatisticsFactory methods
public Statistics createStatistics(StatisticsType type) {
return createOsStatistics(type, null, 0, 0);
}
+
public Statistics createStatistics(StatisticsType type, String textId) {
return createOsStatistics(type, textId, 0, 0);
}
+
public Statistics createStatistics(StatisticsType type, String textId, long numericId) {
return createOsStatistics(type, textId, numericId, 0);
}
+
public Statistics createOsStatistics(StatisticsType type, String textId, long numericId, int osStatFlags) {
if (this.statsDisabled) {
return new DummyStatisticsImpl(type, textId, numericId);
@@ -1750,11 +1760,10 @@ public class InternalDistributedSystem
}
public FunctionStats getFunctionStats(String textId) {
- FunctionStats stats = (FunctionStats)functionExecutionStatsMap.get(textId);
+ FunctionStats stats = (FunctionStats) functionExecutionStatsMap.get(textId);
if (stats == null) {
stats = new FunctionStats(this, textId);
- FunctionStats oldStats = functionExecutionStatsMap.putIfAbsent(textId,
- stats);
+ FunctionStats oldStats = functionExecutionStatsMap.putIfAbsent(textId, stats);
if (oldStats != null) {
stats.close();
stats = oldStats;
@@ -1763,11 +1772,11 @@ public class InternalDistributedSystem
return stats;
}
-
+
public FunctionServiceStats getFunctionServiceStats() {
if (functionServiceStats == null) {
synchronized (this) {
- if(functionServiceStats == null){
+ if (functionServiceStats == null) {
functionServiceStats = new FunctionServiceStats(this, "FunctionExecution");
}
}
@@ -1780,7 +1789,7 @@ public class InternalDistributedSystem
* This method was added to fix bug 40358
*/
public void visitStatistics(StatisticsVisitor visitor) {
- for (Statistics s: this.statsList) {
+ for (Statistics s : this.statsList) {
visitor.visit(s);
}
}
@@ -1790,53 +1799,56 @@ public class InternalDistributedSystem
* @see #visitStatistics
*/
public interface StatisticsVisitor {
+
public void visit(Statistics stat);
}
-
+
public Set<String> getAllFunctionExecutionIds() {
return functionExecutionStatsMap.keySet();
}
-
-
+
+
public Statistics[] findStatisticsByType(final StatisticsType type) {
final ArrayList hits = new ArrayList();
visitStatistics(new StatisticsVisitor() {
- public void visit(Statistics s) {
- if (type == s.getType()) {
- hits.add(s);
- }
+ public void visit(Statistics s) {
+ if (type == s.getType()) {
+ hits.add(s);
}
- });
+ }
+ });
Statistics[] result = new Statistics[hits.size()];
- return (Statistics[])hits.toArray(result);
+ return (Statistics[]) hits.toArray(result);
}
-
+
public Statistics[] findStatisticsByTextId(final String textId) {
final ArrayList hits = new ArrayList();
visitStatistics(new StatisticsVisitor() {
- public void visit(Statistics s) {
- if (s.getTextId().equals(textId)) {
- hits.add(s);
- }
+ public void visit(Statistics s) {
+ if (s.getTextId().equals(textId)) {
+ hits.add(s);
}
- });
+ }
+ });
Statistics[] result = new Statistics[hits.size()];
- return (Statistics[])hits.toArray(result);
+ return (Statistics[]) hits.toArray(result);
}
+
public Statistics[] findStatisticsByNumericId(final long numericId) {
final ArrayList hits = new ArrayList();
visitStatistics(new StatisticsVisitor() {
- public void visit(Statistics s) {
- if (numericId == s.getNumericId()) {
- hits.add(s);
- }
+ public void visit(Statistics s) {
+ if (numericId == s.getNumericId()) {
+ hits.add(s);
}
- });
+ }
+ });
Statistics[] result = new Statistics[hits.size()];
- return (Statistics[])hits.toArray(result);
+ return (Statistics[]) hits.toArray(result);
}
+
public Statistics findStatisticsByUniqueId(final long uniqueId) {
- for (Statistics s: this.statsList) {
+ for (Statistics s : this.statsList) {
if (uniqueId == s.getUniqueId()) {
return s;
}
@@ -1844,7 +1856,9 @@ public class InternalDistributedSystem
return null;
}
- /** for internal use only. Its called by {@link LocalStatisticsImpl#close}. */
+ /**
+ * for internal use only. Its called by {@link LocalStatisticsImpl#close}.
+ */
public void destroyStatistics(Statistics stats) {
synchronized (statsList) {
if (statsList.remove(stats)) {
@@ -1856,14 +1870,16 @@ public class InternalDistributedSystem
public Statistics createAtomicStatistics(StatisticsType type) {
return createAtomicStatistics(type, null, 0);
}
+
public Statistics createAtomicStatistics(StatisticsType type, String textId) {
return createAtomicStatistics(type, textId, 0);
}
+
public Statistics createAtomicStatistics(StatisticsType type, String textId, long numericId) {
if (this.statsDisabled) {
return new DummyStatisticsImpl(type, textId, numericId);
}
-
+
long myUniqueId;
synchronized (statsListUniqueIdLock) {
myUniqueId = statsListUniqueId++; // fix for bug 30597
@@ -1883,64 +1899,63 @@ public class InternalDistributedSystem
/**
* Creates or finds a StatisticType for the given shared class.
*/
- public StatisticsType createType(String name, String description,
- StatisticDescriptor[] stats) {
+ public StatisticsType createType(String name, String description, StatisticDescriptor[] stats) {
return tf.createType(name, description, stats);
}
+
public StatisticsType findType(String name) {
return tf.findType(name);
}
- public StatisticsType[] createTypesFromXml(Reader reader)
- throws IOException {
+
+ public StatisticsType[] createTypesFromXml(Reader reader) throws IOException {
return tf.createTypesFromXml(reader);
}
- public StatisticDescriptor createIntCounter(String name, String description,
- String units) {
+ public StatisticDescriptor createIntCounter(String name, String description, String units) {
return tf.createIntCounter(name, description, units);
}
- public StatisticDescriptor createLongCounter(String name, String description,
- String units) {
+
+ public StatisticDescriptor createLongCounter(String name, String description, String units) {
return tf.createLongCounter(name, description, units);
}
- public StatisticDescriptor createDoubleCounter(String name, String description,
- String units) {
+
+ public StatisticDescriptor createDoubleCounter(String name, String description, String units) {
return tf.createDoubleCounter(name, description, units);
}
- public StatisticDescriptor createIntGauge(String name, String description,
- String units) {
+
+ public StatisticDescriptor createIntGauge(String name, String description, String units) {
return tf.createIntGauge(name, description, units);
}
- public StatisticDescriptor createLongGauge(String name, String description,
- String units) {
+
+ public StatisticDescriptor createLongGauge(String name, String description, String units) {
return tf.createLongGauge(name, description, units);
}
- public StatisticDescriptor createDoubleGauge(String name, String description,
- String units) {
+
+ public StatisticDescriptor createDoubleGauge(String name, String description, String units) {
return tf.createDoubleGauge(name, description, units);
}
- public StatisticDescriptor createIntCounter(String name, String description,
- String units, boolean largerBetter) {
+
+ public StatisticDescriptor createIntCounter(String name, String description, String units, boolean largerBetter) {
return tf.createIntCounter(name, description, units, largerBetter);
}
- public StatisticDescriptor createLongCounter(String name, String description,
- String units, boolean largerBetter) {
+
+ public StatisticDescriptor createLongCounter(String name, String description, String units, boolean largerBetter) {
return tf.createLongCounter(name, description, units, largerBetter);
}
- public StatisticDescriptor createDoubleCounter(String name, String description,
- String units, boolean largerBetter) {
+
+ public StatisticDescriptor createDoubleCounter(String name, String description, String units, boolean largerBetter) {
return tf.createDoubleCounter(name, description, units, largerBetter);
}
- public StatisticDescriptor createIntGauge(String name, String description,
- String units, boolean largerBetter) {
+
+ public StatisticDescriptor createIntGauge(String name, String description, String units, boolean largerBetter) {
return tf.createIntGauge(name, description, units, largerBetter);
}
- public StatisticDescriptor createLongGauge(String name, String description,
- String units, boolean largerBetter) {
+
+ public StatisticDescriptor createLongGauge(String name, String description, String units, boolean largerBetter) {
return tf.createLongGauge(name, description, units, largerBetter);
}
- public StatisticDescriptor createDoubleGauge(String name, String description,
- String units, boolean largerBetter) {
+
+ public StatisticDescriptor createDoubleGauge(String name, String description, String units, boolean largerBetter) {
return tf.createDoubleGauge(name, description, units, largerBetter);
}
@@ -1967,11 +1982,11 @@ public class InternalDistributedSystem
* Makes note of a <code>ReconnectListener</code> whose
* <code>onReconnect</code> method will be invoked when a connection is
* recreated to a distributed system during auto-reconnect.<p>
- *
+ * <p>
* The ReconnectListener set is cleared after a disconnect.
*/
public static void addReconnectListener(ReconnectListener listener) {
-// (new ManagerLogWriter(LogWriterImpl.FINE_LEVEL, System.out)).fine("registering reconnect listener: " + listener);
+ // (new ManagerLogWriter(LogWriterImpl.FINE_LEVEL, System.out)).fine("registering reconnect listener: " + listener);
synchronized (existingSystemsLock) {
synchronized (reconnectListeners) {
reconnectListeners.add(listener);
@@ -1997,18 +2012,16 @@ public class InternalDistributedSystem
*/
private static void notifyConnectListeners(InternalDistributedSystem sys) {
synchronized (connectListeners) {
- for (Iterator iter = connectListeners.iterator(); iter.hasNext();) {
+ for (Iterator iter = connectListeners.iterator(); iter.hasNext(); ) {
try {
ConnectListener listener = (ConnectListener) iter.next();
listener.onConnect(sys);
- }
- catch (VirtualMachineError err) {
+ } catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
@@ -2041,7 +2054,7 @@ public class InternalDistributedSystem
synchronized (reconnectListeners) {
listeners = new ArrayList<ReconnectListener>(reconnectListeners);
}
- for (ReconnectListener listener: listeners) {
+ for (ReconnectListener listener : listeners) {
try {
if (starting) {
listener.reconnecting(oldsys);
@@ -2051,7 +2064,7 @@ public class InternalDistributedSystem
} catch (Throwable t) {
Error err;
if (t instanceof OutOfMemoryError || t instanceof UnknownError) {
- err = (Error)t;
+ err = (Error) t;
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
@@ -2071,19 +2084,15 @@ public class InternalDistributedSystem
/**
* Notifies all resource event listeners. All exceptions are caught here and
* only a warning message is printed in the log
- *
- * @param event
- * Enumeration depicting particular resource event
- * @param resource
- * the actual resource object.
+ * @param event Enumeration depicting particular resource event
+ * @param resource the actual resource object.
*/
private void notifyResourceEventListeners(ResourceEvent event, Object resource) {
- for (Iterator<ResourceEventsListener> iter = resourceListeners.iterator(); iter
- .hasNext();) {
+ for (Iterator<ResourceEventsListener> iter = resourceListeners.iterator(); iter.hasNext(); ) {
try {
ResourceEventsListener listener = (ResourceEventsListener) iter.next();
listener.handleEvent(event, resource);
- } catch(CancelException e) {
+ } catch (CancelException e) {
//ignore
} catch (ManagementException ex) {
if (event == ResourceEvent.CACHE_CREATE) {
@@ -2112,11 +2121,9 @@ public class InternalDistributedSystem
synchronized (this.listeners) {
this.listeners.add(listener);
- Boolean disconnectListenerThreadBoolean =
- (Boolean) disconnectListenerThread.get();
+ Boolean disconnectListenerThreadBoolean = (Boolean) disconnectListenerThread.get();
- if (disconnectListenerThreadBoolean == null ||
- !disconnectListenerThreadBoolean.booleanValue()) {
+ if (disconnectListenerThreadBoolean == null || !disconnectListenerThreadBoolean.booleanValue()) {
// Don't add disconnect listener after messaging has been disabled.
// Do this test _after_ adding the listener to narrow the window.
// It's possible to miss it still and never invoke the listener, but
@@ -2124,12 +2131,13 @@ public class InternalDistributedSystem
String reason = this.stopper.cancelInProgress();
if (reason != null) {
this.listeners.remove(listener); // don't leave in the list!
- throw new DistributedSystemDisconnectedException(LocalizedStrings.InternalDistributedSystem_NO_LISTENERS_PERMITTED_AFTER_SHUTDOWN_0.toLocalizedString(reason), dm.getRootCause());
+ throw new DistributedSystemDisconnectedException(LocalizedStrings.InternalDistributedSystem_NO_LISTENERS_PERMITTED_AFTER_SHUTDOWN_0.toLocalizedString(reason), dm
+ .getRootCause());
}
}
} // synchronized
}
-
+
/**
* A non-null value of Boolean.TRUE will identify a thread being used to
* execute disconnectListeners. {@link #addDisconnectListener} will
@@ -2157,11 +2165,11 @@ public class InternalDistributedSystem
List l = existingSystems;
if (l.isEmpty()) {
return null;
- }
- else {
- return (InternalDistributedSystem)l.get(0);
+ } else {
+ return (InternalDistributedSystem) l.get(0);
}
}
+
/**
* Test hook
*/
@@ -2173,7 +2181,7 @@ public class InternalDistributedSystem
public Properties getProperties() {
return this.config.toProperties();
}
-
+
@Override
public Properties getSecurityProperties() {
return this.config.getSecurityProps();
@@ -2182,7 +2190,6 @@ public class InternalDistributedSystem
/**
* Fires an "informational" <code>SystemMembershipEvent</code> in
* admin VMs.
- *
* @since GemFire 4.0
*/
public void fireInfoEvent(Object callback) {
@@ -2193,7 +2200,7 @@ public class InternalDistributedSystem
* Installs a shutdown hook to ensure
* that we are disconnected if an application VM shuts down
* without first calling disconnect itself.
- */
+ */
public static final Thread shutdownHook;
static {
@@ -2203,32 +2210,30 @@ public class InternalDistributedSystem
Thread tmp_shutdownHook = null;
try {
//Added for bug 38407
- if( ! Boolean.getBoolean(DISABLE_SHUTDOWN_HOOK_PROPERTY)) {
+ if (!Boolean.getBoolean(DISABLE_SHUTDOWN_HOOK_PROPERTY)) {
tmp_shutdownHook = new Thread(tg, new Runnable() {
public void run() {
DistributedSystem ds = InternalDistributedSystem.getAnyInstance();
setThreadsSocketPolicy(true /* conserve sockets */);
if (ds != null && ds.isConnected()) {
- LogWriterI18n log = ((InternalDistributedSystem)ds).getInternalLogWriter();
+ LogWriterI18n log = ((InternalDistributedSystem) ds).getInternalLogWriter();
log.info(LocalizedStrings.InternalDistributedSystem_shutdownHook_shuttingdown);
- DurableClientAttributes dca = ((InternalDistributedSystem)ds).getDistributedMember().getDurableClientAttributes();
+ DurableClientAttributes dca = ((InternalDistributedSystem) ds).getDistributedMember().getDurableClientAttributes();
boolean isDurableClient = false;
-
- if(dca != null) {
- isDurableClient = ((dca.getId() == null || dca.getId().isEmpty()) ? false : true);
+
+ if (dca != null) {
+ isDurableClient = ((dca.getId() == null || dca.getId().isEmpty()) ? false : true);
}
-
- ((InternalDistributedSystem)ds).disconnect(false,
- LocalizedStrings.InternalDistributedSystem_NORMAL_DISCONNECT
- .toLocalizedString(), isDurableClient/*keep alive drive from this*/);
+
+ ((InternalDistributedSystem) ds).disconnect(false, LocalizedStrings.InternalDistributedSystem_NORMAL_DISCONNECT.toLocalizedString(), isDurableClient/*keep alive drive from this*/);
// this was how we wanted to do it for 5.7, but there were shutdown
// issues in PR/dlock (see bug 39287)
-// InternalDistributedSystem ids = (InternalDistributedSystem)ds;
-// if (ids.getDistributionManager() != null &&
-// ids.getDistributionManager().getMembershipManager() != null) {
-// ids.getDistributionManager().getMembershipManager()
-// .uncleanShutdown("VM is exiting", null);
-// }
+ // InternalDistributedSystem ids = (InternalDistributedSystem)ds;
+ // if (ids.getDistributionManager() != null &&
+ // ids.getDistributionManager().getMembershipManager() != null) {
+ // ids.getDistributionManager().getMembershipManager()
+ // .uncleanShutdown("VM is exiting", null);
+ // }
}
}
}, SHUTDOWN_HOOK_NAME);
@@ -2249,26 +2254,25 @@ public class InternalDistributedSystem
/**
* Invoked before a connection to the distributed system is
* disconnected.
- *
* @param sys the the system we are disconnecting from
* process should take before returning.
*/
public void onDisconnect(InternalDistributedSystem sys);
}
-
+
/**
* A listener that gets invoked before and after a successful auto-reconnect
*/
public interface ReconnectListener {
+
/**
* Invoked when reconnect attempts are initiated
- *
* @param oldSystem the old DS, which is in a partially disconnected state
* and cannot be used for messaging
*/
public void reconnecting(InternalDistributedSystem oldSystem);
-
+
/**
* Invoked after a reconnect to the distributed system
* @param oldSystem the old DS
@@ -2280,9 +2284,9 @@ public class InternalDistributedSystem
/**
* A listener that gets invoked after this connection to the
* distributed system is disconnected
- *
*/
public interface ShutdownListener extends DisconnectListener {
+
/**
* Invoked after the connection to the distributed system has
* been disconnected
@@ -2294,7 +2298,7 @@ public class InternalDistributedSystem
/**
* Integer representing number of tries already made
* to reconnect and that failed.
- * */
+ */
private volatile static int reconnectAttemptCounter = 0;
/**
@@ -2305,14 +2309,14 @@ public class InternalDistributedSystem
/**
* Boolean indicating if DS needs to reconnect and reconnect
* is in progress.
- * */
+ */
private volatile boolean attemptingToReconnect = false;
-
+
/**
* Boolean indicating this DS joined through a reconnect attempt
*/
private volatile boolean reconnected = false;
-
+
/**
* Boolean indicating that this member has been shunned by other members
* or a network partition has occurred
@@ -2328,14 +2332,14 @@ public class InternalDistributedSystem
* We need to know when reconnecting.
*/
private boolean locatorDMTypeForced;
-
-
+
+
/**
* Returns true if we are reconnecting the distributed system or
* reconnect has completed. If this returns true it means that
* this instance of the DS is now disconnected and unusable.
*/
- public boolean isReconnecting(){
+ public boolean isReconnecting() {
InternalDistributedSystem rds = this.reconnectDS;
if (!attemptingToReconnect) {
return false;
@@ -2360,7 +2364,7 @@ public class InternalDistributedSystem
public boolean isReconnectingDS() {
return this.isReconnectingDS;
}
-
+
/**
* returns the membership socket of the old
* distributed system, if available, when
@@ -2374,7 +2378,7 @@ public class InternalDistributedSystem
}
return null;
}
-
+
/**
* Returns true if this DS reconnected to the distributed system after
* a forced disconnect or loss of required-roles
@@ -2382,7 +2386,7 @@ public class InternalDistributedSystem
public boolean reconnected() {
return this.reconnected;
}
-
+
/**
* Returns true if this DS has been kicked out of the distributed system
*/
@@ -2395,20 +2399,21 @@ public class InternalDistributedSystem
*/
private volatile boolean reconnectCancelled = false;
- /** Make sure this instance of DS never does a reconnect.
+ /**
+ * Make sure this instance of DS never does a reconnect.
* Also if reconnect is in progress cancel it.
*/
public void cancelReconnect() {
-// (new ManagerLogWriter(LogWriterImpl.FINE_LEVEL, System.out)).fine("cancelReconnect invoked", new Exception("stack trace"));
+ // (new ManagerLogWriter(LogWriterImpl.FINE_LEVEL, System.out)).fine("cancelReconnect invoked", new Exception("stack trace"));
this.reconnectCancelled = true;
if (isReconnecting()) {
synchronized (this.reconnectLock) { // should the synchronized be first on this and
- // then on this.reconnectLock.
+ // then on this.reconnectLock.
this.reconnectLock.notifyAll();
}
}
}
-
+
/**
* This lock must be acquired *after* locking any GemFireCache.
*/
@@ -2417,9 +2422,7 @@ public class InternalDistributedSystem
/**
* Tries to reconnect to the distributed system on role loss
* if configure to reconnect.
- *
* @param oldCache cache that has apparently failed
- *
*/
public boolean tryReconnect(boolean forcedDisconnect, String reason, GemFireCacheImpl oldCache) {
final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -2459,19 +2462,18 @@ public class InternalDistributedSystem
/**
* Returns the value for the number of time reconnect has been tried.
* Test method used by DUnit.
- * */
+ */
public static int getReconnectAttemptCounter() {
return reconnectAttemptCounter;
}
-
+
/**
* A reconnect is tried when gemfire is configured to reconnect in
* case of a required role loss. The reconnect will try reconnecting
* to the distributed system every max-time-out millseconds for
* max-number-of-tries configured in gemfire.properties file. It uses
* the cache.xml file to intialize the cache and create regions.
- *
- * */
+ */
private void reconnect(boolean forcedDisconnect, String reason) {
// Collect all the state for cache
@@ -2485,25 +2487,25 @@ public class InternalDistributedSystem
//
// If reconnecting for forced-disconnect we ignore max-tries and keep attempting
// to join the distributed system until successful
-
+
this.attemptingToReconnect = true;
InternalDistributedSystem ids = InternalDistributedSystem.getAnyInstance();
if (ids == null) {
ids = this;
}
-
+
// first save the current cache description. This is created by
// the membership manager when forced-disconnect starts. If we're
// reconnecting for lost roles then this will be null
String cacheXML = null;
List<CacheServerCreation> cacheServerCreation = null;
-
+
GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
if (cache != null) {
cacheXML = cache.getCacheConfig().getCacheXMLDescription();
cacheServerCreation = cache.getCacheConfig().getCacheServerCreation();
}
-
+
DistributionConfig oldConfig = ids.getConfig();
Properties configProps = getProperties();
int timeOut = oldConfig.getMaxWaitTimeForReconnect();
@@ -2517,7 +2519,7 @@ public class InternalDistributedSystem
}
Thread.currentThread().setName("ReconnectThread");
}
-
+
// get the membership manager for quorum checks
MembershipManager mbrMgr = this.dm.getMembershipManager();
this.quorumChecker = mbrMgr.getQuorumChecker();
@@ -2528,7 +2530,7 @@ public class InternalDistributedSystem
logger.debug("Initialized quorum checking service: {}", quorumChecker);
}
}
-
+
// LOG:CLEANUP: deal with reconnect and INHIBIT_DM_BANNER -- this should be ok now
String appendToLogFile = System.getProperty(APPEND_TO_LOG_FILE);
if (appendToLogFile == null) {
@@ -2551,36 +2553,34 @@ public class InternalDistributedSystem
if (isDebugEnabled) {
logger.debug("Max number of tries : {} and max time out : {}", maxTries, timeOut);
}
- if(reconnectAttemptCounter >= maxTries){
+ if (reconnectAttemptCounter >= maxTries) {
if (isDebugEnabled) {
logger.debug("Stopping the checkrequiredrole thread because reconnect : {} reached the max number of reconnect tries : {}", reconnectAttemptCounter, maxTries);
}
throw new CacheClosedException(LocalizedStrings.InternalDistributedSystem_SOME_REQUIRED_ROLES_MISSING.toLocalizedString());
}
}
-
+
if (reconnectAttemptCounter == 0) {
reconnectAttemptTime = System.currentTimeMillis();
<TRUNCATED>