You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/20 00:59:58 UTC

[40/51] [abbrv] [partial] incubator-geode git commit: Merge remote-tracking branch 'origin/develop' into feature/GEODE-917

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
index 276efec,0000000..a193699
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
@@@ -1,3056 -1,0 +1,3079 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.gemstone.gemfire.distributed.internal;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.io.Reader;
 +import java.lang.reflect.Array;
 +import java.net.InetAddress;
 +import java.util.ArrayList;
 +import java.util.Date;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.LinkedHashSet;
 +import java.util.List;
 +import java.util.Properties;
 +import java.util.Set;
 +import java.util.SortedSet;
 +import java.util.StringTokenizer;
 +import java.util.TreeSet;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.CopyOnWriteArrayList;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +import org.apache.logging.log4j.Logger;
 +
 +import com.gemstone.gemfire.CancelCriterion;
 +import com.gemstone.gemfire.CancelException;
 +import com.gemstone.gemfire.ForcedDisconnectException;
 +import com.gemstone.gemfire.GemFireConfigException;
 +import com.gemstone.gemfire.GemFireIOException;
 +import com.gemstone.gemfire.LogWriter;
 +import com.gemstone.gemfire.StatisticDescriptor;
 +import com.gemstone.gemfire.Statistics;
 +import com.gemstone.gemfire.StatisticsType;
 +import com.gemstone.gemfire.StatisticsTypeFactory;
 +import com.gemstone.gemfire.SystemConnectException;
 +import com.gemstone.gemfire.SystemFailure;
 +import com.gemstone.gemfire.admin.AlertLevel;
 +import com.gemstone.gemfire.cache.CacheClosedException;
 +import com.gemstone.gemfire.cache.CacheFactory;
 +import com.gemstone.gemfire.cache.execute.internal.FunctionServiceManager;
++import com.gemstone.gemfire.cache.server.CacheServer;
 +import com.gemstone.gemfire.distributed.DistributedMember;
 +import com.gemstone.gemfire.distributed.DistributedSystem;
 +import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
 +import com.gemstone.gemfire.distributed.DurableClientAttributes;
 +import com.gemstone.gemfire.distributed.internal.locks.GrantorRequestProcessor;
 +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 +import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
 +import com.gemstone.gemfire.distributed.internal.membership.QuorumChecker;
 +import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
 +import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager;
 +import com.gemstone.gemfire.i18n.LogWriterI18n;
 +import com.gemstone.gemfire.internal.Assert;
 +import com.gemstone.gemfire.internal.DSFIDFactory;
 +import com.gemstone.gemfire.internal.DummyStatisticsImpl;
 +import com.gemstone.gemfire.internal.GemFireStatSampler;
 +import com.gemstone.gemfire.internal.InternalDataSerializer;
 +import com.gemstone.gemfire.internal.InternalInstantiator;
 +import com.gemstone.gemfire.internal.LinuxProcFsStatistics;
 +import com.gemstone.gemfire.internal.LocalStatisticsImpl;
 +import com.gemstone.gemfire.internal.OsStatisticsFactory;
 +import com.gemstone.gemfire.internal.SocketCreator;
 +import com.gemstone.gemfire.internal.StatisticsImpl;
 +import com.gemstone.gemfire.internal.StatisticsManager;
 +import com.gemstone.gemfire.internal.StatisticsTypeFactoryImpl;
 +import com.gemstone.gemfire.internal.SystemTimer;
 +import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
 +import com.gemstone.gemfire.internal.cache.CacheServerImpl;
 +import com.gemstone.gemfire.internal.cache.CacheConfig;
 +import com.gemstone.gemfire.internal.cache.EventID;
 +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 +import com.gemstone.gemfire.internal.cache.execute.FunctionServiceStats;
 +import com.gemstone.gemfire.internal.cache.execute.FunctionStats;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.HandShake;
 +import com.gemstone.gemfire.internal.cache.xmlcache.CacheServerCreation;
 +import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 +import com.gemstone.gemfire.internal.logging.InternalLogWriter;
 +import com.gemstone.gemfire.internal.logging.LogService;
 +import com.gemstone.gemfire.internal.logging.LogWriterFactory;
 +import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
 +import com.gemstone.gemfire.internal.logging.log4j.AlertAppender;
 +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 +import com.gemstone.gemfire.internal.logging.log4j.LogWriterAppender;
 +import com.gemstone.gemfire.internal.logging.log4j.LogWriterAppenders;
 +import com.gemstone.gemfire.internal.offheap.MemoryAllocator;
 +import com.gemstone.gemfire.internal.offheap.OffHeapStorage;
 +import com.gemstone.gemfire.internal.tcp.ConnectionTable;
 +import com.gemstone.gemfire.internal.util.concurrent.StoppableCondition;
 +import com.gemstone.gemfire.internal.util.concurrent.StoppableReentrantLock;
 +import com.gemstone.gemfire.management.ManagementException;
 +import com.gemstone.gemfire.security.GemFireSecurityException;
 +
 +/**
 + * The concrete implementation of {@link DistributedSystem} that
 + * provides internal-only functionality.
 + *
 + * @author David Whitlock
 + * @since 3.0
 + *
 + */
 +public class InternalDistributedSystem
 +  extends DistributedSystem
 +  implements OsStatisticsFactory, StatisticsManager
 +{
 +  private static final Logger logger = LogService.getLogger();
 +  
 +  public static final String DISABLE_MANAGEMENT_PROPERTY = "gemfire.disableManagement";
 +  
 +  /**
 +   * If auto-reconnect is going on this will hold a reference to it
 +   */
 +  public static volatile DistributedSystem systemAttemptingReconnect;
 +
 +  public static final CreationStackGenerator DEFAULT_CREATION_STACK_GENERATOR = new CreationStackGenerator() {
 +    @Override
 +    public Throwable generateCreationStack(final DistributionConfig config) {
 +      return null;
 +    }
 +  };
 +
 +  // the following is overridden from DistributedTestCase to fix #51058
 +  public static final AtomicReference<CreationStackGenerator> TEST_CREATION_STACK_GENERATOR = new AtomicReference<CreationStackGenerator>(DEFAULT_CREATION_STACK_GENERATOR);
 +  
 +  /** The distribution manager that is used to communicate with the
 +   * distributed system. */
 +  protected DM dm;
 +
 +  private final GrantorRequestProcessor.GrantorRequestContext grc;
 +  public GrantorRequestProcessor.GrantorRequestContext getGrantorRequestContext() {
 +    return grc;
 +  }
 +  
 +  /** Numeric id that identifies this node in a DistributedSystem */
 +  private long id;
 +
 +  /** The log writer used to log information messages */
 +  protected InternalLogWriter logWriter = null;
 +
 +  /** The log writer used to log security related messages */
 +  protected InternalLogWriter securityLogWriter = null;
 + 
 +  /** Distributed System clock */
 +  private DSClock clock;
 +
 +//  /** The log writer was provided by an external entity */
 +//  private boolean externalLogWriterProvided = false;
 +//
 +  private LogWriterAppender logWriterAppender = null;
 +
 +  private LogWriterAppender securityLogWriterAppender = null;
 +  
 +  /** Time this system was created */
 +  private final long startTime;
 +
 +  /**
 +   * Guards access to {@link #isConnected}
 +   */
 +  protected final Object isConnectedMutex = new Object();
 +  
 +  /** Is this <code>DistributedSystem</code> connected to a
 +   * distributed system? 
 +   *
 +   * Concurrency: volatile for reads and protected by synchronization of {@link #isConnectedMutex} for writes
 +   */
 +  protected volatile boolean isConnected;
 +
 +  /**
 +   * Set to true if this distributed system is a singleton; it will
 +   * always be the only member of the system.
 +   */
 +  private boolean isLoner = false;
 +
 +  /** The sampler for this DistributedSystem.
 +   */
 +  private GemFireStatSampler sampler = null;
 +
 +  /** A set of listeners that are invoked when this connection to the
 +   * distributed system is disconnected */
 +  private final Set listeners = new LinkedHashSet(); // needs to be ordered
 +
 +  /** Set of listeners that are invoked whenever a connection is created to
 +   * the distributed system */
 +  private static Set connectListeners = new LinkedHashSet(); // needs to be ordered
 +  
 +  /** auto-reconnect listeners */
 +  private static List<ReconnectListener> reconnectListeners = new ArrayList<ReconnectListener>();
 +  
 +  /**
 +   * whether this DS is one created to reconnect to the distributed 
 +   * system after a forced-disconnect.  This state is cleared once reconnect
 +   * is successful.
 +   */
 +  private boolean isReconnectingDS;
 +  
 +  /**
 +   * During a reconnect attempt this is used to perform quorum checks
 +   * before allowing a location service to be started up in this JVM.
 +   * If quorum checks fail then we delay starting location services until
 +   * a live locator can be contacted.
 +   */
 +  private QuorumChecker quorumChecker;
 +  
 +  /** sqlfire disconnect listener */
 +  private DisconnectListener sqlfDisconnectListener;
 +
 +  /**
 +   * A Constant that matches the ThreadGroup name of the shutdown hook.
 +   * This constant is used to insure consistency with LoggingThreadGroup.
 +   * Due to Bug 38407, be careful about moving this to another class.
 +   */
 +  public final static String SHUTDOWN_HOOK_NAME = "Distributed system shutdown hook";
 +  /**
 +   * A property to prevent shutdown hooks from being registered with the VM.
 +   * This is regarding bug 38407
 +   */
 +  public final static String DISABLE_SHUTDOWN_HOOK_PROPERTY = "gemfire.disableShutdownHook";
 +
 +  /**
 +   * A property to append to existing log-file instead of truncating it.
 +   */
 +  public final static String APPEND_TO_LOG_FILE = "gemfire.append-log";
 +
 +  ////////////////////  Configuration Fields  ////////////////////
 +
 +  /** The config object used to create this distributed system */
 +  private final DistributionConfig originalConfig;
 +
 +  /** The config object to which most configuration work is delegated */
 +  private DistributionConfig config;
 +
 +  private final boolean statsDisabled = Boolean.getBoolean("gemfire.statsDisabled");
 +
 +  private volatile boolean shareSockets = DistributionConfig.DEFAULT_CONSERVE_SOCKETS;
 +
 +  /** if this distributed system starts a locator, it is stored here */
 +  private InternalLocator startedLocator;
 +  
 +  private List<ResourceEventsListener> resourceListeners;
 +  
 +  private final boolean disableManagement = Boolean.getBoolean(DISABLE_MANAGEMENT_PROPERTY);
 +
 +  /**
 +   * Stack trace showing the creation of this instance of InternalDistributedSystem.
 +   */
 +  private final Throwable creationStack;
 +  
 +  /////////////////////  Static Methods  /////////////////////
 +
 +  /**
 +   * Creates a new instance of <code>InternalDistributedSystem</code>
 +   * with the given configuration.
 +   */
 +  public static InternalDistributedSystem newInstance(Properties config) {
 +    boolean success = false;
 +    InternalDataSerializer.checkSerializationVersion();
 +    try {
 +      SystemFailure.startThreads();
 +    InternalDistributedSystem newSystem = new InternalDistributedSystem(config);
 +    newSystem.initialize();
 +    reconnectAttemptCounter = 0; // reset reconnect count since we just got a new connection
 +    notifyConnectListeners(newSystem);
 +    success = true;
 +    return newSystem;
 +    } finally {
 +      if (!success) {
 +        LoggingThreadGroup.cleanUpThreadGroups(); // bug44365 - logwriters accumulate, causing mem leak
 +        SystemFailure.stopThreads();
 +      }
 +    }
 +  }
 +  
 +  
 +  /**
 +   * creates a non-functional instance for testing
 +   * @param nonDefault - non-default distributed system properties
 +   */
 +  public static InternalDistributedSystem newInstanceForTesting(DM dm, Properties nonDefault) {
 +    InternalDistributedSystem sys = new InternalDistributedSystem(nonDefault);
 +    sys.config = new RuntimeDistributionConfigImpl(sys);
 +    sys.dm = dm;
 +    sys.isConnected = true;
 +    return sys;
 +  }
 +
 +  /**
 +   * Returns a connection to the distributed system that is suitable
 +   * for administration.  For administration, we are not as strict
 +   * when it comes to existing connections.
 +   *
 +   * @since 4.0
 +   */
 +  public static DistributedSystem connectForAdmin(Properties props) {
 +    return DistributedSystem.connectForAdmin(props);
 +  }
 +
 +  /**
 +   * Returns a connected distributed system for this VM, or null
 +   * if there is no connected distributed system in this VM.
 +   * This method synchronizes on the existingSystems collection.
 +   *
 +   * <p>author bruce
 +   * @since 5.0
 +   */
 +  public static InternalDistributedSystem getConnectedInstance() {
 +    InternalDistributedSystem result = null;
 +    synchronized (existingSystemsLock) {
 +      if (!existingSystems.isEmpty()) {
 +        InternalDistributedSystem existingSystem =
 +          (InternalDistributedSystem) existingSystems.get(0);
 +        if (existingSystem.isConnected())
 +          result = existingSystem;
 +      }
 +    }
 +    return result;
 +  }
 +
 +  /**
 +   * Returns the current distributed system, if there is one.
 +   * Note: this method is no longer unsafe size existingSystems uses copy-on-write.
 +   * <p>author bruce
 +   * @since 5.0
 +   */
 +  public static InternalDistributedSystem unsafeGetConnectedInstance() {
 +    InternalDistributedSystem result = getAnyInstance();
 +    if (result != null) {
 +      if (!result.isConnected()) {
 +        result = null;
 +      }
 +    }
 +    return result;
 +  }
 +
 +  /**
 +   * @return distribution stats, or null if there is no distributed system available
 +   */
 +  public static DMStats getDMStats() {
 +    InternalDistributedSystem sys = getAnyInstance();
 +    if (sys != null && sys.dm != null) {
 +      return sys.dm.getStats();
 +    }
 +    return null;
 +  }
 +
 +  /**
 +   * @return a log writer, or null if there is no distributed system available
 +   */
 +  public static LogWriterI18n getLoggerI18n() {
 +    InternalDistributedSystem sys = getAnyInstance();
 +    if (sys != null && sys.logWriter != null) {
 +      return sys.logWriter.convertToLogWriterI18n();
 +    }
 +    return null;
 +  }
 +  
 +  public static InternalLogWriter getStaticInternalLogWriter() {
 +    InternalDistributedSystem sys = getAnyInstance();
 +    if (sys != null) {
 +      return sys.logWriter;
 +    }
 +    return null;
 +  }
 +  
 +  public InternalLogWriter getInternalLogWriter() {
 +    return this.logWriter;
 +  }
 +  
 +  public static InternalLogWriter getStaticSecurityInternalLogWriter() {
 +    InternalDistributedSystem sys = getAnyInstance();
 +    if (sys != null) {
 +      return sys.securityLogWriter;
 +    }
 +    return null;
 +  }
 +  
 +  public InternalLogWriter getSecurityInternalLogWriter() {
 +    InternalDistributedSystem sys = getAnyInstance();
 +    if (sys != null) {
 +      return sys.securityLogWriter;
 +    }
 +    return null;
 +  }
 +  
 +  /** reset the reconnectAttempt counter for a new go at reconnecting */
 +  private static void resetReconnectAttemptCounter() {
 +    reconnectAttemptCounter = 0;
 +  }
 +
 +
 +  //////////////////////  Constructors  //////////////////////
 +
 +  /**
 +   * Creates a new <code>InternalDistributedSystem</code> with the
 +   * given configuration properties.  Does all of the magic of finding
 +   * the "default" values of properties.  See {@link
 +   * DistributedSystem#connect} for a list of exceptions that may be
 +   * thrown.
 +   *
 +   * @param nonDefault
 +   *        The non-default configuration properties specified by the
 +   *        caller
 +   *
 +   * @see DistributedSystem#connect
 +   */
 +  private InternalDistributedSystem(Properties nonDefault) {
 +
 +    // register DSFID types first; invoked explicitly so that all message type
 +    // initializations do not happen in first deserialization on a possibly
 +    // "precious" thread
 +    DSFIDFactory.registerTypes();
 +
 +    Object o = nonDefault.remove(DistributionConfig.DS_RECONNECTING_NAME);
 +    if (o instanceof Boolean) {
 +      this.isReconnectingDS = ((Boolean)o).booleanValue();
 +    } else {
 +      this.isReconnectingDS = false;
 +    }
 +    
 +    o = nonDefault.remove(DistributionConfig.DS_QUORUM_CHECKER_NAME);
 +    if (o instanceof QuorumChecker) {
 +      this.quorumChecker = (QuorumChecker)o;
 +    }
 +
 +    o = nonDefault.remove(DistributionConfig.DS_CONFIG_NAME);
 +    if (o instanceof DistributionConfigImpl) {
 +      this.originalConfig = (DistributionConfigImpl) o;
 +    } else {
 +      this.originalConfig = new DistributionConfigImpl(nonDefault);
 +    }
 +    
 +    ((DistributionConfigImpl)this.originalConfig).checkForDisallowedDefaults(); // throws IllegalStateEx
 +    this.shareSockets = this.originalConfig.getConserveSockets();
 +    this.startTime = System.currentTimeMillis();
 +    this.grc = new GrantorRequestProcessor.GrantorRequestContext(stopper);
 +    
 +    elderLock = new StoppableReentrantLock(stopper);
 +    elderLockCondition = elderLock.newCondition();
 +
 +    this.creationStack = TEST_CREATION_STACK_GENERATOR.get().generateCreationStack(this.originalConfig);
 +
 +//    if (DistributionConfigImpl.multicastTest) {
 +//      this.logger.warning("Use of multicast has been forced");
 +//    }
 +//    if (DistributionConfigImpl.forceDisableTcp) {
 +//      this.logger.warning("Use of UDP has been forced");
 +//    }
 +//    if (com.gemstone.gemfire.distributed.internal.membership.jgroup.JGroupMembershipManager.multicastTest) {
 +//      this.logger.warning("Use of multicast for all distributed cache operations has been forced");
 +//    }
 +
 +  }
 +
 +  ////////////////////  Instance Methods  ////////////////////
 +  
 +  /**
 +   * Registers a listener to the system
 +   * 
 +   * @param listener
 +   *          listener to be added
 +   */
 +  public void addResourceListener(ResourceEventsListener listener) {
 +    resourceListeners.add(listener);
 +  }
 +  
 +  /**
 +   * Un-Registers a listener to the system
 +   * 
 +   * @param listener
 +   *          listener to be removed
 +   */
 +  public void removeResourceListener(ResourceEventsListener listener) {
 +    resourceListeners.remove(listener);
 +  }
 +
 +  /**
 +   * @return the listeners registered with the system
 +   */
 +  public List<ResourceEventsListener> getResourceListeners() {
 +    return resourceListeners;
 +  }
 +
 +  /**
 +   * Handles a particular event associated with a resource
 +   * 
 +   * @param event
 +   *          Resource event
 +   * @param resource
 +   *          resource on which event is generated
 +   */
 +  public void handleResourceEvent(ResourceEvent event, Object resource) {
 +    if (disableManagement) {
 +      return;
 +    }
 +    if (resourceListeners.size() == 0) {
 +      return;
 +    }
 +    notifyResourceEventListeners(event, resource);
 +  }
 +
 +  /** Returns true if system is a loner (for testing) */
 +  public boolean isLoner() {
 +    return this.isLoner;
 +  }
 +
 +  private MemoryAllocator offHeapStore = null;
 +  
 +  public MemoryAllocator getOffHeapStore() {
 +    return this.offHeapStore;
 +  }
 +  
 +  /**
 +   * Initializes this connection to a distributed system with the
 +   * current configuration state.
 +   */
 +  private void initialize() {
 +    if (this.originalConfig.getLocators().equals("")) {
 +      if (this.originalConfig.getMcastPort() != 0) {
 +        throw new GemFireConfigException("The "
 +                                          + DistributionConfig.LOCATORS_NAME
 +                                          + " attribute can not be empty when the "
 +                                          + DistributionConfig.MCAST_PORT_NAME
 +                                          + " attribute is non-zero.");
 +      } else {
 +        // no distribution
 +        this.isLoner = true;
 +      }
 +    }
 +
 +    this.config = new RuntimeDistributionConfigImpl(this);
 +    if (!this.isLoner) {
 +      this.attemptingToReconnect = (reconnectAttemptCounter > 0);
 +    }
 +    try {
 +    SocketCreator.getDefaultInstance(this.config);
 +
 +    // LOG: create LogWriterAppender(s) if log-file or security-log-file is specified
 +    final boolean hasLogFile = this.config.getLogFile() != null && !this.config.getLogFile().equals(new File(""));
 +    final boolean hasSecurityLogFile = this.config.getSecurityLogFile() != null && !this.config.getSecurityLogFile().equals(new File(""));
 +    LogService.configureLoggers(hasLogFile, hasSecurityLogFile);
 +    if (hasLogFile || hasSecurityLogFile) {
 +      
 +      // main log file
 +      if (hasLogFile) {
 +        // if log-file then create logWriterAppender
 +        this.logWriterAppender = LogWriterAppenders.getOrCreateAppender(LogWriterAppenders.Identifier.MAIN, this.isLoner, this.config, true);
 +      }
 +      
 +      // security log file
 +      if (hasSecurityLogFile) {
 +        // if security-log-file then create securityLogWriterAppender
 +        this.securityLogWriterAppender = LogWriterAppenders.getOrCreateAppender(LogWriterAppenders.Identifier.SECURITY, this.isLoner, this.config, false);
 +      } else {
 +        // let security route to regular log-file or stdout
 +      }
 +    }
 +    
 +    // LOG: create LogWriterLogger(s) for backwards compatibility of getLogWriter and getSecurityLogWriter
 +    if (this.logWriter == null) {
 +      this.logWriter = LogWriterFactory.createLogWriterLogger(this.isLoner, false, this.config, true);
 +      this.logWriter.fine("LogWriter is created.");
 +    }
 +    
 +//    logWriter.info("Created log writer for IDS@"+System.identityHashCode(this));
 +    
 +    if (this.securityLogWriter == null) {
 +      // LOG: whole new LogWriterLogger instance for security
 +      this.securityLogWriter = LogWriterFactory.createLogWriterLogger(this.isLoner, true, this.config, false);
 +      this.securityLogWriter.fine("SecurityLogWriter is created.");
 +    }
 +    
 +    Services.setLogWriter(this.logWriter);
 +    Services.setSecurityLogWriter(this.securityLogWriter);
 +
 +    this.clock = new DSClock(this.isLoner);
 +    
 +    if (this.attemptingToReconnect && logger.isDebugEnabled()) {
 +      logger.debug("This thread is initializing a new DistributedSystem in order to reconnect to other members");
 +    }
 +    // Note we need loners to load the license in case they are a
 +    // bridge server and will need to enforce the member limit
 +    if (Boolean.getBoolean(InternalLocator.FORCE_LOCATOR_DM_TYPE)) {
 +      this.locatorDMTypeForced = true;
 +    }
 +
 +    // Initialize the Diffie-Hellman and public/private keys
 +    try {
 +      HandShake.initCertsMap(this.config.getSecurityProps());
 +      HandShake.initPrivateKey(this.config.getSecurityProps());
 +      HandShake.initDHKeys(this.config);
 +    }
 +    catch (Exception ex) {
 +      throw new GemFireSecurityException(
 +        LocalizedStrings.InternalDistributedSystem_PROBLEM_IN_INITIALIZING_KEYS_FOR_CLIENT_AUTHENTICATION.toLocalizedString(), ex);
 +    }
 +
 +    final long offHeapMemorySize = OffHeapStorage.parseOffHeapMemorySize(getConfig().getOffHeapMemorySize());
 +
 +    this.offHeapStore = OffHeapStorage.createOffHeapStorage(getLogWriter(), this, offHeapMemorySize, this);
 +    
 +    // Note: this can only happen on a linux system
 +    if (getConfig().getLockMemory()) {
 +      // This calculation is not exact, but seems fairly close.  So far we have
 +      // not loaded much into the heap and the current RSS usage is already 
 +      // included the available memory calculation.
 +      long avail = LinuxProcFsStatistics.getAvailableMemory(logger);
 +      long size = offHeapMemorySize + Runtime.getRuntime().totalMemory();
 +      if (avail < size) {
 +        if (GemFireCacheImpl.ALLOW_MEMORY_LOCK_WHEN_OVERCOMMITTED) {
 +          logger.warn(LocalizedMessage.create(LocalizedStrings.InternalDistributedSystem_MEMORY_OVERCOMMIT_WARN, size - avail));
 +        } else {
 +          throw new IllegalStateException(LocalizedStrings.InternalDistributedSystem_MEMORY_OVERCOMMIT.toLocalizedString(avail, size));
 +        }
 +      }
 +      
 +      logger.info("Locking memory. This may take a while...");
 +      GemFireCacheImpl.lockMemory();
 +      logger.info("Finished locking memory.");
 +    }
 +
 +    try {
 +      startInitLocator();
 +    } catch (InterruptedException e) {
 +      throw new SystemConnectException("Startup has been interrupted", e);
 +    }
 +
 +    synchronized (this.isConnectedMutex) {
 +      this.isConnected = true;
 +    }
 +    
 +    if (!this.isLoner) {
 +      try {
 +        if (this.quorumChecker != null) {
 +          this.quorumChecker.suspend();
 +        }
 +        this.dm = DistributionManager.create(this);
 +        // fix bug #46324
 +        if (InternalLocator.hasLocator()) {
 +          InternalLocator locator = InternalLocator.getLocator();
 +          getDistributionManager().addHostedLocators(getDistributedMember(), InternalLocator.getLocatorStrings(), locator.isSharedConfigurationEnabled());
 +        }
 +      }
 +      finally {
 +        if (this.dm == null && this.quorumChecker != null) {
 +          this.quorumChecker.resume();
 +        }
 +        setDisconnected();
 +      }
 +    }
 +    else {
 +      this.dm = new LonerDistributionManager(this, this.logWriter);
 +    }
 +    
 +    Assert.assertTrue(this.dm != null);
 +    Assert.assertTrue(this.dm.getSystem() == this);
 +
 +    try {
 +      this.id = this.dm.getChannelId();
 +    } catch (DistributedSystemDisconnectedException e) {
 +      // bug #48144 - The dm's channel threw an NPE.  It now throws this exception
 +      // but during startup we should instead throw a SystemConnectException
 +      throw new SystemConnectException(
 +          LocalizedStrings.InternalDistributedSystem_DISTRIBUTED_SYSTEM_HAS_DISCONNECTED
 +            .toLocalizedString(), e);
 +    }
 +
 +    synchronized (this.isConnectedMutex) {
 +      this.isConnected = true;
 +    }
 +    if (attemptingToReconnect  &&  (this.startedLocator == null)) {
 +      try {
 +        startInitLocator();
 +      } catch (InterruptedException e) {
 +        throw new SystemConnectException("Startup has been interrupted", e);
 +      }
 +    }
 +    try {
 +      endInitLocator();
 +    }
 +    catch (IOException e) {
 +      throw new GemFireIOException("Problem finishing a locator service start", e);
 +    }
 +
 +      if (!statsDisabled) {
 +        // to fix bug 42527 we need a sampler
 +        // even if sampling is not enabled.
 +      this.sampler = new GemFireStatSampler(this);
 +      this.sampler.start();
 +    }
 +
 +    if (this.logWriterAppender != null) {
 +      LogWriterAppenders.startupComplete(LogWriterAppenders.Identifier.MAIN);
 +    }
 +    if (this.securityLogWriterAppender != null) {
 +      LogWriterAppenders.startupComplete(LogWriterAppenders.Identifier.SECURITY);
 +    }
 +    
 +    //this.logger.info("ds created", new RuntimeException("DEBUG: STACK"));
 +
 +    //Log any instantiators that were registered before the log writer
 +    //was created
 +    InternalInstantiator.logInstantiators();
 +    }
 +    catch (RuntimeException ex) {
 +      this.config.close();
 +      throw ex;
 +    }
 +    
 +    resourceListeners = new CopyOnWriteArrayList<ResourceEventsListener>();
 +    this.reconnected = this.attemptingToReconnect;
 +    this.attemptingToReconnect = false;
 +  }
 +
 +  /**
 +   * @since 5.7
 +   */
 +  private void startInitLocator() throws InterruptedException {
 +    String locatorString = this.originalConfig.getStartLocator();
 +    if (locatorString.length() > 0) {
 +      // when reconnecting we don't want to join with a colocated locator unless
 +      // there is a quorum of the old members available
 +      if (attemptingToReconnect && !this.isConnected) {
 +        if (this.quorumChecker != null) {
 +          logger.info("performing a quorum check to see if location services can be started early");
 +          if (!quorumChecker.checkForQuorum(3*this.config.getMemberTimeout())) {
 +            logger.info("quorum check failed - not allowing location services to start early");
 +            return;
 +          }
 +          logger.info("Quorum check passed - allowing location services to start early");
 +        }
 +      }
 +      DistributionLocatorId locId = new DistributionLocatorId(locatorString);
 +      try {
 +        this.startedLocator = InternalLocator.createLocator(
 +            locId.getPort(), 
 +            null,
 +            null,
 +            this.logWriter, // LOG: this is after IDS has created LogWriterLoggers and Appenders 
 +            this.securityLogWriter, // LOG: this is after IDS has created LogWriterLoggers and Appenders
 +            locId.getHost(),
 +            locId.getHostnameForClients(),
 +            this.originalConfig.toProperties(), false);
 +        if(locId.isPeerLocator()) {
 +          boolean startedPeerLocation = false;
 +          try {
 +            this.startedLocator.startPeerLocation(true);
 +            startedPeerLocation = true;
 +          } finally {
 +            if (!startedPeerLocation) {
 +              this.startedLocator.stop();
 +            }
 +          }
 +        }
 +      } catch (IOException e) {
 +        throw new GemFireIOException( LocalizedStrings.
 +          InternalDistributedSystem_PROBLEM_STARTING_A_LOCATOR_SERVICE
 +            .toLocalizedString(), e);
 +      }
 +    }
 +  }
 +  
 +  /**
 +   * @since 5.7
 +   */
 +  private void endInitLocator() throws IOException {
 +    InternalLocator loc = this.startedLocator;
 +    if (loc != null) {
 +      String locatorString = this.originalConfig.getStartLocator();
 +//      DistributionLocatorId locId = new DistributionLocatorId(locatorString);
 +      boolean finished = false;
 +      try {
 +        // during the period when the product is using only paper licenses we always
 +        // start server location services in order to be able to log information
 +        // about the use of cache servers
 +        //      if(locId.isServerLocator()) {
 +        loc.startServerLocation(this);
 +        //      }
 +      
 +        loc.endStartLocator(this);
 +        finished = true;
 +      } finally {
 +        if (!finished) {
 +          loc.stop();
 +        }
 +      }
 +    }
 +  }
 +
 +  /** record a locator as a dependent of this distributed system */
 +  public void setDependentLocator(InternalLocator theLocator) {
 +    this.startedLocator = theLocator;
 +  }
 +  
 +  /** Used by DistributionManager to fix bug 33362
 +   */
 +  void setDM(DM dm) {
 +    this.dm = dm;
 +  }
 +
 +  /**
 +   * Checks whether or not this connection to a distributed system is
 +   * closed.
 +   *
 +   * @throws DistributedSystemDisconnectedException
 +   *         This connection has been {@link #disconnect(boolean, String, boolean) disconnected}
 +   */
 +  private void checkConnected() {
 +    if (!isConnected()) {
 +      throw new DistributedSystemDisconnectedException(LocalizedStrings.InternalDistributedSystem_THIS_CONNECTION_TO_A_DISTRIBUTED_SYSTEM_HAS_BEEN_DISCONNECTED.toLocalizedString(),
 +          dm.getRootCause());
 +    }
 +  }
 +
 +  @Override
 +  public boolean isConnected() {
 +    if (this.dm == null) {
 +      return false;
 +    }
 +    if (this.dm.getCancelCriterion().cancelInProgress() != null) {
 +      return false;
 +    }
 +    if (this.isDisconnecting) {
 +      return false;
 +    }
 +    return this.isConnected;
 +  }
 +  
 +  /**
 +   * This class defers to the DM.  If we don't have a DM, we're dead.
 +   * @author jpenney
 +   */
 +  protected class Stopper extends CancelCriterion {
 +    @Override
 +    public String cancelInProgress() {
 +      checkFailure();
 +      if (dm == null) {
 +        return "No dm";
 +      }
 +      return dm.getCancelCriterion().cancelInProgress();
 +    }
 +    
 +    @Override
 +    public RuntimeException generateCancelledException(Throwable e) {
 +      if (dm == null) {
 +        return new DistributedSystemDisconnectedException("no dm", e);
 +      }
 +      return dm.getCancelCriterion().generateCancelledException(e);
 +    }
 +  }
 +  
 +  /**
 +   * Handles all cancellation queries for this distributed system
 +   */
 +  private final Stopper stopper = new Stopper();
 +  
 +  @Override
 +  public CancelCriterion getCancelCriterion() {
 +    return stopper;
 +  }
 +  
 +  public boolean isDisconnecting() {
 +    if (this.dm == null) {
 +      return true;
 +    }
 +    if (this.dm.getCancelCriterion().cancelInProgress() != null) {
 +      return true;
 +    }
 +    if (!this.isConnected) {
 +      return true;
 +    }
 +    return this.isDisconnecting;
 +    }
 +
 +  @Override
 +  public LogWriter getLogWriter() {
 +    return this.logWriter;
 +  }
 +  
 +  public DSClock getClock() {
 +    return this.clock;
 +  }
 +
 +  @Override
 +  public LogWriter getSecurityLogWriter() {
 +    return this.securityLogWriter;
 +  }
 +
 +  /*
 +  public Cache myCache;
 +
 +  public void setCache(Cache cache){
 +	  myCache=cache;
 +  }
 +  public Cache getCache(){
 +	  return myCache;
 +  }
 +  */
 +  /**
 +   * Returns the stat sampler
 +   */
 +  public GemFireStatSampler getStatSampler() {
 +    return this.sampler;
 +  }
 +  
 +  /** Has this system started the disconnect process? */
 +  protected volatile boolean isDisconnecting = false;
 +  
 +  /**
 +   * Disconnects this VM from the distributed system.  Shuts down the
 +   * distribution manager, and if necessary,
 +   */
 +  @Override
 +  public void disconnect() {
 +    disconnect(false, LocalizedStrings.InternalDistributedSystem_NORMAL_DISCONNECT.toLocalizedString(), false);
 +  }
 +  
 +  /**
 +   * Disconnects this member from the distributed system when an internal
 +   * error has caused distribution to fail (e.g., this member was shunned)
 +   * @param reason a string describing why the disconnect is occurring
 +   * @param cause an optional exception showing the reason for abnormal disconnect
 +   * @param shunned whether this member was shunned by the membership coordinator
 +   */
 +  public void disconnect(String reason, Throwable cause, boolean shunned) {
 +    boolean isForcedDisconnect = dm.getRootCause() instanceof ForcedDisconnectException;
 +    boolean reconnected = false;
 +    this.reconnected = false;
 +    if (isForcedDisconnect) {
 +      this.forcedDisconnect = true;
 +      resetReconnectAttemptCounter();
 +    
 +     reconnected = tryReconnect(true, reason, GemFireCacheImpl.getInstance());
 +    }
 +    if (!reconnected) {
 +      disconnect(false, reason, shunned);
 +    }
 +  }
 +  
 +  /**
 +   * This is how much time, in milliseconds to allow a disconnect listener
 +   * to run before we interrupt it.
 +   */
 +  static private final long MAX_DISCONNECT_WAIT =
 +    Long.getLong("DistributionManager.DISCONNECT_WAIT", 
 +        10 * 1000).longValue();
 +
 +  /**
 +   * Run a disconnect listener, checking for errors and
 +   * honoring the timeout {@link #MAX_DISCONNECT_WAIT}.
 +   *
 +   * @param dc the listener to run
 +   */
 +  private void runDisconnect(final DisconnectListener dc,
 +      ThreadGroup tg) {
 +    // Create a general handler for running the disconnect
 +    Runnable r = new Runnable() {
 +      public void run() {
 +        try {
 +          disconnectListenerThread.set(Boolean.TRUE);
 +          dc.onDisconnect(InternalDistributedSystem.this);
 +        }
 +        catch (CancelException e) {
 +          if (logger.isDebugEnabled()) {
 +            logger.debug("Disconnect listener <{}> thwarted by cancellation: {}", dc, e, logger.isTraceEnabled() ? e : null);
 +          }
 +        }
 +      }
 +    };
 +
 +    // Launch it and wait a little bit
 +    Thread t = new Thread(tg, r, dc.toString());
 +    try {
 +      t.start();
 +      t.join(MAX_DISCONNECT_WAIT);
 +    }
 +    catch (InterruptedException e) {
 +      Thread.currentThread().interrupt();
 +      logger.warn(LocalizedMessage.create(LocalizedStrings.InternalDistributedSystem_INTERRUPTED_WHILE_PROCESSING_DISCONNECT_LISTENER), e);
 +    }
 +
 +    // Make sure the listener gets the cue to die
 +    if (t.isAlive()) {
 +      logger.warn(LocalizedMessage.create(LocalizedStrings.InternalDistributedSystem_DISCONNECT_LISTENER_STILL_RUNNING__0, dc));
 +      t.interrupt();
 +
 +      try {
 +        t.join(MAX_DISCONNECT_WAIT);
 +      }
 +      catch (InterruptedException e) {
 +        Thread.currentThread().interrupt();
 +      }
 +      
 +      if (t.isAlive()) {
 +        logger.warn(LocalizedMessage.create(LocalizedStrings.InternalDistributedSystem_DISCONNECT_LISTENER_IGNORED_ITS_INTERRUPT__0, dc));
 +      }
 +    }
 +    
 +  }
 +  
 +  public boolean isDisconnectListenerThread() {
 +    Boolean disconnectListenerThreadBoolean = 
 +      (Boolean) this.disconnectListenerThread.get();
 +
 +    return disconnectListenerThreadBoolean != null &&
 +           disconnectListenerThreadBoolean.booleanValue();
 +  }
 +  
 +  /**
 +   * Run a disconnect listener in the same thread sequence as the reconnect.
 +   * @param dc the listener to run
 +   * @param tg the thread group to run the listener in
 +   */
 +
 +  private void runDisconnectForReconnect(final DisconnectListener dc,
 +      ThreadGroup tg){
 +    try {
 +      dc.onDisconnect(InternalDistributedSystem.this);
 +    } catch (DistributedSystemDisconnectedException e) {
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("Disconnect listener <{}> thwarted by shutdown: {}", dc, e, logger.isTraceEnabled() ? e : null);
 +      }
 +    }
 +  }
 +
 +  /**
 +   * A logging thread group for the disconnect and shutdown listeners
 +   */
 +  private final ThreadGroup disconnectListenerThreadGroup = 
 +    LoggingThreadGroup.createThreadGroup("Disconnect Listeners");
 +  
 +  /**
 +   * Disconnect cache, run disconnect listeners.
 +   * 
 +   * @param doReconnect whether a reconnect will be done
 +   * @param reason the reason that the system is disconnecting
 +   * 
 +   * @return a collection of shutdownListeners
 +   */
 +  private HashSet doDisconnects(boolean doReconnect, String reason) {
 +    // Make a pass over the disconnect listeners, asking them _politely_
 +    // to clean up.
 +    HashSet shutdownListeners = new HashSet();
 +    for (;;) {
 +      DisconnectListener listener = null;
 +      synchronized (this.listeners) {
 +        Iterator itr = listeners.iterator();
 +        if (!itr.hasNext()) {
 +          break;
 +        }
 +        listener = (DisconnectListener)itr.next();
 +        if (listener instanceof ShutdownListener) {
 +          shutdownListeners.add(listener);
 +        }
 +        itr.remove();
 +      } // synchronized
 +     
 +      if (doReconnect){
 +        runDisconnectForReconnect(listener, disconnectListenerThreadGroup);
 +      } else {
 +        runDisconnect(listener, disconnectListenerThreadGroup);
 +      }
 +    } // for
 +    return shutdownListeners;
 +  }
 +  
 +  /**
 +   * Process the shutdown listeners.  It is essential that the DM has been
 +   * shut down before calling this step, to ensure that no new listeners are
 +   * registering.
 +   * 
 +   * @param shutdownListeners shutdown listeners initially registered with us
 +   */
 +  private void doShutdownListeners(HashSet shutdownListeners) {
 +    if (shutdownListeners == null) {
 +      return;
 +    }
 +
 +    // Process any shutdown listeners we reaped during first pass
 +    Iterator it = shutdownListeners.iterator();
 +    while (it.hasNext()) {
 +      ShutdownListener s = (ShutdownListener)it.next();
 +      try {
 +        s.onShutdown(this);
 +      }
 +      catch (VirtualMachineError err) {
 +        SystemFailure.initiateFailure(err);
 +        // If this ever returns, rethrow the error.  We're poisoned
 +        // now, so don't let this thread continue.
 +        throw err;
 +      }
 +      catch (Throwable t) {
 +        // Whenever you catch Error or Throwable, you must also
 +        // catch VirtualMachineError (see above).  However, there is
 +        // _still_ a possibility that you are dealing with a cascading
 +        // error condition, so you also need to check to see if the JVM
 +        // is still usable:
 +        SystemFailure.checkFailure();
 +        // things could break since we continue, but we want to disconnect!
 +        logger.fatal(LocalizedMessage.create(LocalizedStrings.InternalDistributedSystem_SHUTDOWNLISTENER__0__THREW, s), t);
 +      }
 +    }
 +
 +    // During the window while we were running disconnect listeners, new
 +    // disconnect listeners may have appeared. After messagingDisabled is
 +    // set, no new ones will be created.  However, we must process any
 +    // that appeared in the interim.
 +    for (;;) {
 +      // Pluck next listener from the list
 +      DisconnectListener dcListener = null;
 +      ShutdownListener sdListener = null;
 +      synchronized (this.listeners) {
 +        Iterator itr = listeners.iterator();
 +        if (!itr.hasNext())
 +          break;
 +        dcListener = (DisconnectListener)itr.next();
 +        itr.remove();
 +        if (dcListener instanceof ShutdownListener)
 +          sdListener = (ShutdownListener)dcListener;
 +      }
 +
 +      // Run the disconnect
 +      runDisconnect(dcListener, disconnectListenerThreadGroup);
 +
 +      // Run the shutdown, if any
 +      if (sdListener != null) {
 +        try {
 +          // TODO: should we make sure this times out?
 +          sdListener.onShutdown(this);
 +        }
 +        catch (VirtualMachineError err) {
 +          SystemFailure.initiateFailure(err);
 +          // If this ever returns, rethrow the error.  We're poisoned
 +          // now, so don't let this thread continue.
 +          throw err;
 +        }
 +        catch (Throwable t) {
 +          // Whenever you catch Error or Throwable, you must also
 +          // catch VirtualMachineError (see above).  However, there is
 +          // _still_ a possibility that you are dealing with a cascading
 +          // error condition, so you also need to check to see if the JVM
 +          // is still usable:
 +          SystemFailure.checkFailure();
 +          // things could break since we continue, but we want to disconnect!
 +          logger.fatal(LocalizedMessage.create(LocalizedStrings.InternalDistributedSystem_DISCONNECTLISTENERSHUTDOWN_THREW), t);
 +        }
 +      }
 +    } // for
 +  }
 +  
 +  /**
 +   * break any potential circularity in {@link #loadEmergencyClasses()}
 +   */
 +  private static volatile boolean emergencyClassesLoaded = false;
 +  
 +  /**
 +   * Ensure that the MembershipManager class gets loaded.
 +   * 
 +   * @see SystemFailure#loadEmergencyClasses()
 +   */
 +  static public void loadEmergencyClasses() {
 +    if (emergencyClassesLoaded) return;
 +    emergencyClassesLoaded = true;
 +    GMSMembershipManager.loadEmergencyClasses();
 +  }
 +  
 +  /**
 +   * Closes the membership manager
 +   * 
 +   * @see SystemFailure#emergencyClose()
 +   */
 +  public void emergencyClose() {
 +    final boolean DEBUG = SystemFailure.TRACE_CLOSE;
 +    if (dm != null) {
 +      MembershipManager mm = dm.getMembershipManager();
 +      if (mm != null) {
 +        if (DEBUG) {
 +          System.err.println("DEBUG: closing membership manager");
 +        }
 +        mm.emergencyClose();
 +        if (DEBUG) {
 +          System.err.println("DEBUG: back from closing membership manager");
 +        }
 +      }
 +    }
 +    
 +    // Garbage collection
 +    // Leave dm alone; its CancelCriterion will help people die
 +    this.isConnected = false;
 +    if (dm != null) {
 +      dm.setRootCause(SystemFailure.getFailure());
 +    }
 +    this.isDisconnecting = true;
 +    this.listeners.clear();
 +    if (DEBUG) {
 +      System.err.println("DEBUG: done with InternalDistributedSystem#emergencyClose");
 +    }
 +  }
 +  
 +  private void setDisconnected() {
 +    synchronized (this.isConnectedMutex) {
 +      this.isConnected = false;
 +      isConnectedMutex.notifyAll();
 +    }
 +  }
 +
 +  private void waitDisconnected() {
 +    synchronized (this.isConnectedMutex) {
 +      while (this.isConnected) {
 +        boolean interrupted = Thread.interrupted();
 +        try {
 +          this.isConnectedMutex.wait();
 +        }
 +        catch (InterruptedException e) {
 +          interrupted = true;
 +          getLogWriter().convertToLogWriterI18n().warning(
 +            LocalizedStrings.InternalDistributedSystem_DISCONNECT_WAIT_INTERRUPTED, e);
 +        }
 +        finally {
 +          if (interrupted) {
 +            Thread.currentThread().interrupt();
 +          }
 +        }
 +      } // while
 +    }
 +  }
 +
 +  /**
 +   * Disconnects this VM from the distributed system. Shuts down the
 +   * distribution manager.
 +   * 
 +   * @param preparingForReconnect
 +   *          true if called by a reconnect operation
 +   * @param reason
 +   *          the reason the disconnect is being performed
 +   * @param keepAlive
 +   *          true if user requested durable subscriptions are to be retained at
 +   *          server.
 +   */
 +  protected void disconnect(boolean preparingForReconnect, String reason, boolean keepAlive) {
 +    boolean isShutdownHook = (shutdownHook != null)
 +                             && (Thread.currentThread() == shutdownHook);
 +
 +    if (!preparingForReconnect) {
 +//      logger.info("disconnecting IDS@"+System.identityHashCode(this));
 +      synchronized(reconnectListeners) {
 +        reconnectListeners.clear();
 +      }
 +      cancelReconnect();
 +    }
 +    
 +    final boolean isDebugEnabled = logger.isDebugEnabled();
 +    try {
 +      HashSet shutdownListeners = null;
 +      try {
 +        if (isDebugEnabled) {
 +          logger.debug("DistributedSystem.disconnect invoked on {}", this);
 +        }
 +        synchronized (GemFireCacheImpl.class) {
 +          // bug 36955, 37014: don't use a disconnect listener on the cache;
 +          // it takes too long.
 +          //
 +          // However, make sure cache is completely closed before starting
 +          // the distributed system close.
 +          GemFireCacheImpl currentCache = GemFireCacheImpl.getInstance();
 +          if (currentCache != null && !currentCache.isClosed()) {
 +            disconnectListenerThread.set(Boolean.TRUE); // bug #42663 - this must be set while closing the cache
 +            try {
 +              currentCache.close(reason, dm.getRootCause(), keepAlive, true); // fix for 42150
 +            } 
 +            catch (VirtualMachineError e) {
 +              SystemFailure.initiateFailure(e);
 +              throw e;
 +            }
 +            catch (Throwable e) {
 +              SystemFailure.checkFailure();
 +              // Whenever you catch Error or Throwable, you must also
 +              // check for fatal JVM error (see above).  However, there is
 +              logger.warn(LocalizedMessage.create(LocalizedStrings.InternalDistributedSystem_EXCEPTION_TRYING_TO_CLOSE_CACHE), e);
 +            }
 +            finally {
 +              disconnectListenerThread.set(Boolean.FALSE);
 +            }
 +          } 
 +
 +          // While still holding the lock, make sure this instance is
 +          // marked as shutting down
 +          synchronized (this) {
 +            if (this.isDisconnecting) {
 +              // It's already started, but don't return
 +              // to the caller until it has completed.
 +              waitDisconnected();
 +              return;
 +            } // isDisconnecting
 +            this.isDisconnecting = true;
 +
 +            if (!preparingForReconnect) {
 +              // move cancelReconnect above this synchronized block fix for bug 35202
 +              if (this.reconnectDS != null) {
 +                // break recursion
 +                if (isDebugEnabled) {
 +                  logger.debug("disconnecting reconnected DS: {}", this.reconnectDS);
 +                }
 +                InternalDistributedSystem r = this.reconnectDS;
 +                this.reconnectDS = null;
 +                r.disconnect(false, null, false);
 +              }
 +            } // !reconnect
 +          } // synchronized (this)
 +        } // synchronized (GemFireCache.class)
 +
 +        if (!isShutdownHook) {
 +          shutdownListeners = doDisconnects(attemptingToReconnect, reason);
 +        }
 +    
 +        if (!this.attemptingToReconnect) {
 +          if (this.logWriterAppender != null) {
 +            LogWriterAppenders.stop(LogWriterAppenders.Identifier.MAIN);
 +          }
 +          if (this.securityLogWriterAppender != null) {
 +            // LOG:SECURITY: old code did NOT invoke this
 +            LogWriterAppenders.stop(LogWriterAppenders.Identifier.SECURITY);
 +          }
 +        }
 +        
 +        AlertAppender.getInstance().shuttingDown();
 +        
 +      }
 +      finally { // be ABSOLUTELY CERTAIN that dm closed
 +        try {
 +          // Do the bulk of the close...
 +          this.dm.close();
 +          // we close the locator after the DM so that when split-brain detection
 +          // is enabled, loss of the locator doesn't cause the DM to croak
 +          if (this.startedLocator != null) {
 +            this.startedLocator.stop(forcedDisconnect, preparingForReconnect, false);
 +            this.startedLocator = null;
 +          }
 +        } finally { // timer canceled
 +          // bug 38501: this has to happen *after*
 +          // the DM is closed :-(
 +          if (!preparingForReconnect) {
 +            SystemTimer.cancelSwarm(this);
 +          }
 +        } // finally timer cancelled
 +      } // finally dm closed
 +
 +      if (!isShutdownHook) {
 +        doShutdownListeners(shutdownListeners);
 +      }
 +
 +      // closing the Aggregate stats
 +      if(functionServiceStats != null){
 +        functionServiceStats.close();
 +      }
 +      // closing individual function stats
 +      for (FunctionStats functionstats : functionExecutionStatsMap.values()) {
 +        functionstats.close();
 +      }
 +
 +      (new FunctionServiceManager()).unregisterAllFunctions();
 +
 +      if (this.sampler != null) {
 +        this.sampler.stop();
 +        this.sampler = null;
 +      }
 +
 +      if (!this.attemptingToReconnect) {
 +        if (this.logWriterAppender != null) {
 +          LogWriterAppenders.destroy(LogWriterAppenders.Identifier.MAIN);
 +        }
 +        if (this.securityLogWriterAppender != null) {
 +          LogWriterAppenders.destroy(LogWriterAppenders.Identifier.SECURITY);
 +        }
 +      }
 +
 +      // NOTE: no logging after this point :-)
 +
 +      LoggingThreadGroup.cleanUpThreadGroups(); // bug35388 - logwriters accumulate, causing mem leak
 +      EventID.unsetDS();
 +
 +    }
 +    finally {
 +      try {
 +        if (getOffHeapStore() != null) {
 +          getOffHeapStore().close();
 +        }
 +      } finally {
 +      try {
 +        removeSystem(this);
 +        // Close the config object
 +        this.config.close();
 +      }
 +      finally {
 +        // Finally, mark ourselves as disconnected
 +        setDisconnected();
 +        SystemFailure.stopThreads();
 +      }
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Returns the distribution manager for accessing this distributed system.
 +   */
 +  public DM getDistributionManager() {
 +    checkConnected();
 +    return this.dm;
 +  }
 +
 +  /**
 +   * Returns the distribution manager without checking for connected or not so
 +   * can also return null.
 +   */
 +  public final DM getDM() {
 +    return this.dm;
 +  }
 +  
 +  /**
 +   * If this DistributedSystem is attempting to reconnect to the distributed system
 +   * this will return the quorum checker created by the old MembershipManager for
 +   * checking to see if a quorum of old members can be reached.
 +   * @return the quorum checking service
 +   */
 +  public final QuorumChecker getQuorumChecker() {
 +    return this.quorumChecker;
 +  }
 +  
 +  /**
 +   * Returns true if this DS has been attempting to reconnect but
 +   * the attempt has been cancelled.
 +   */
 +  public boolean isReconnectCancelled() {
 +    return this.reconnectCancelled;
 +  }
 +
 +  /**
 +   * Returns whether or not this distributed system has the same
 +   * configuration as the given set of properties.
 +   *
 +   * @see DistributedSystem#connect
 +   */
 +  public boolean sameAs(Properties props, boolean isConnected) {
 +    return originalConfig.sameAs(DistributionConfigImpl.produce(props, isConnected));
 +  }
 +
 +  public boolean threadOwnsResources() {
 +    Boolean b = ConnectionTable.getThreadOwnsResourcesRegistration();
 +    if (b == null) {
 +      // thread does not have a preference so return default
 +      return !this.shareSockets;
 +    } else {
 +      return b.booleanValue();
 +    }
 +  }
 +
 +  /**
 +   * Returns whether or not the given configuration properties refer
 +   * to the same distributed system as this
 +   * <code>InternalDistributedSystem</code> connection.
 +   *
 +   * @since 4.0
 +   */
 +  public boolean sameSystemAs(Properties props) {
 +    DistributionConfig other = DistributionConfigImpl.produce(props);
 +    DistributionConfig me = this.getConfig();
 +
 +    if (!me.getBindAddress().equals(other.getBindAddress())) {
 +      return false;
 +    }
 +
 +    // @todo Do we need to compare SSL properties?
 +
 +    // locators
 +    String myLocators = me.getLocators();
 +    String otherLocators = other.getLocators();
 +
 +    // quick check
 +    if (myLocators.equals(otherLocators)) {
 +      return true;
 +
 +    } else {
 +      myLocators = canonicalizeLocators(myLocators);
 +      otherLocators = canonicalizeLocators(otherLocators);
 +
 +      return myLocators.equals(otherLocators);
 +    }
 +  }
 +
 +  /**
 +   * Canonicalizes a locators string so that they may be compared.
 +   *
 +   * @since 4.0
 +   */
 +  private static String canonicalizeLocators(String locators) {
 +    SortedSet sorted = new TreeSet();
 +    StringTokenizer st = new StringTokenizer(locators, ",");
 +    while (st.hasMoreTokens()) {
 +      String l = st.nextToken();
 +      StringBuffer canonical = new StringBuffer();
 +      DistributionLocatorId locId = new DistributionLocatorId(l);
 +      String addr = locId.getBindAddress();
 +      if (addr != null && addr.trim().length() > 0) {
 +        canonical.append(addr);
 +      }
 +      else {
 +        canonical.append(locId.getHost().getHostAddress());
 +      }
 +      canonical.append("[");
 +      canonical.append(String.valueOf(locId.getPort()));
 +      canonical.append("]");
 +      sorted.add(canonical.toString());
 +    }
 +
 +    StringBuffer sb = new StringBuffer();
 +    for (Iterator iter = sorted.iterator(); iter.hasNext(); ) {
 +      sb.append((String) iter.next());
 +      if (iter.hasNext()) {
 +        sb.append(",");
 +      }
 +    }
 +    return sb.toString();
 +  }
 +
 +  private final StoppableReentrantLock elderLock;
 +  private final StoppableCondition elderLockCondition;
 +  
 +  public StoppableReentrantLock getElderLock() {
 +    return elderLock;
 +  }
 +  public StoppableCondition getElderLockCondition() {
 +    return elderLockCondition;
 +  }
 +  
 +  /**
 +   * Returns the current configuration of this distributed system.
 +   */
 +  public DistributionConfig getConfig() {
 +    return this.config;
 +  }
 +
 +  /**
 +   * Returns the id of this connection to the distributed system.
 +   * This is actually the port of the distribution manager's
 +   * distribution channel.
 +   *
 +   * @see com.gemstone.gemfire.distributed.internal.DistributionChannel#getId
 +   */
 +  @Override
 +  public long getId() {
 +    return this.id;
 +  }
 +
 +  /**
 +   * Returns the string value of the distribution manager's id.
 +   */
 +  @Override
 +  public String getMemberId() {
 +    return String.valueOf(this.dm.getId());
 +  }
 +
 +  @Override
 +  public InternalDistributedMember getDistributedMember() {
 +    return this.dm.getId();
 +  }
 +  @Override
 +  public Set<DistributedMember> getAllOtherMembers() {
 +    return dm.getAllOtherMembers();
 +  }
 +  @Override
 +  public Set<DistributedMember> getGroupMembers(String group) {
 +    return dm.getGroupMembers(group);
 +  }
 +  
 +  
 +
 +  @Override
 +  public Set<DistributedMember> findDistributedMembers(InetAddress address) {
 +    Set<InternalDistributedMember> allMembers = dm.getDistributionManagerIdsIncludingAdmin();
 +    Set<DistributedMember> results = new HashSet<DistributedMember>(2);
 +    
 +    //Search through the set of all members
 +    for(InternalDistributedMember member: allMembers) {
 +      
 +      Set<InetAddress> equivalentAddresses = dm.getEquivalents(member.getInetAddress());
 +      //Check to see if the passed in address is matches one of the addresses on
 +      //the given member.
 +      if(address.equals(member.getInetAddress()) || equivalentAddresses.contains(address)) {
 +        results.add(member);
 +      }
 +    }
 +    
 +    return results;
 +  }
 +
 +  @Override
 +  public DistributedMember findDistributedMember(String name) {
 +    Set<DistributedMember> allMembers = dm.getDistributionManagerIdsIncludingAdmin();
 +    for(DistributedMember member : allMembers) {
 +      if(member.getName().equals(name)) {
 +        return member;
 +      }
 +    }
 +    return null;
 +  }
 +
 +  /**
 +   * Returns the configuration this distributed system was created with.
 +   */
 +  public DistributionConfig getOriginalConfig() {
 +    return this.originalConfig;
 +  }
 +  @Override
 +  public String getName() {
 +    return getOriginalConfig().getName();
 +  }
 +
 +  ///////////////////////  Utility Methods  ///////////////////////
 +
 +  /**
 +   * Since {@link DistributedSystem#connect} guarantees that there is
 +   * a canonical instance of <code>DistributedSystem</code> for each
 +   * configuration, we can use the default implementation of
 +   * <code>equals</code>.
 +   *
 +   * @see #sameAs
 +   */
 +  @Override
 +  public boolean equals(Object o) {
 +    return super.equals(o);
 +  }
 +
 +  /**
 +   * Since we use the default implementation of {@link #equals
 +   * equals}, we can use the default implementation of
 +   * <code>hashCode</code>.
 +   */
 +  @Override
 +  public int hashCode() {
 +    return super.hashCode();
 +  }
 +
 +  /**
 +   * Returns a string describing this connection to distributed system
 +   * (including highlights of its configuration).
 +   */
 +  @Override
 +  public String toString() {
 +    StringBuffer sb = new StringBuffer();
 +    sb.append("Connected ");
 +    String name = this.getName();
 +    if (name != null && !name.equals("")) {
 +      sb.append("\"");
 +      sb.append(name);
 +      sb.append("\" ");
 +    }
 +    sb.append("(id=");
 +    sb.append(Integer.toHexString(System.identityHashCode(this)));
 +    sb.append(") ");
 +
 +    sb.append("to distributed system using ");
 +    int port = this.config.getMcastPort();
 +    if (port != 0) {
 +      sb.append("multicast port ");
 +      sb.append(port);
 +      sb.append(" ");
 +
 +    } else {
 +      sb.append("locators \"");
 +      sb.append(this.config.getLocators());
 +      sb.append("\" ");
 +    }
 +
 +    File logFile = this.config.getLogFile();
 +    sb.append("logging to ");
 +    if (logFile == null || logFile.equals(new File(""))) {
 +      sb.append("standard out ");
 +
 +    } else {
 +      sb.append(logFile);
 +      sb.append(" ");
 +    }
 +
 +    sb.append(" started at ");
 +    sb.append((new Date(this.startTime)).toString());
 +    
 +    if (!this.isConnected()) {
 +      sb.append(" (closed)");
 +    }
 +
 +    return sb.toString().trim();
 +  }
 +
 +  private final ArrayList<Statistics> statsList = new ArrayList<Statistics>();
 +  private int statsListModCount = 0;
 +  private long statsListUniqueId = 1;
 +  private final Object statsListUniqueIdLock = new Object();
 +  
 +  // As the function execution stats can be lot in number, its better to put
 +  // them in a map so that it will be accessible immediately
 +  private final ConcurrentHashMap<String, FunctionStats>  functionExecutionStatsMap = new ConcurrentHashMap<String, FunctionStats>();
 +  private FunctionServiceStats functionServiceStats = null;
 +  
 +  public int getStatListModCount() {
 +    return this.statsListModCount;
 +  }
 +  public List<Statistics> getStatsList() {
 +    return this.statsList;
 +  }
 +
 +  @Override
 +  public final int getStatisticsCount() {
 +    int result = 0;
 +    List<Statistics> statsList = this.statsList;
 +    if (statsList != null) {
 +      result = statsList.size();
 +    }
 +    return result;
 +  }
 +  
 +  @Override
 +  public final Statistics findStatistics(long id) {
 +    List<Statistics> statsList = this.statsList;
 +    synchronized (statsList) {
 +      for (Statistics s : statsList) {
 +        if (s.getUniqueId() == id) {
 +          return s;
 +        }
 +      }
 +    }
 +    throw new RuntimeException(LocalizedStrings.PureStatSampler_COULD_NOT_FIND_STATISTICS_INSTANCE.toLocalizedString());
 +  }
 +  
 +  @Override
 +  public final boolean statisticsExists(long id) {
 +    List<Statistics> statsList = this.statsList;
 +    synchronized (statsList) {
 +      for (Statistics s : statsList) {
 +        if (s.getUniqueId() == id) {
 +          return true;
 +        }
 +      }
 +    }
 +    return false;
 +  }
 +
 +  @Override
 +  public final Statistics[] getStatistics() {
 +    List<Statistics> statsList = this.statsList;
 +    synchronized (statsList) {
 +      return (Statistics[])statsList.toArray(new Statistics[statsList.size()]);
 +    }
 +  }
 +  
 +  // StatisticsFactory methods
 +  public Statistics createStatistics(StatisticsType type) {
 +    return createOsStatistics(type, null, 0, 0);
 +  }
 +  public Statistics createStatistics(StatisticsType type, String textId) {
 +    return createOsStatistics(type, textId, 0, 0);
 +  }
 +  public Statistics createStatistics(StatisticsType type, String textId, long numericId) {
 +    return createOsStatistics(type, textId, numericId, 0);
 +  }
 +  public Statistics createOsStatistics(StatisticsType type, String textId, long numericId, int osStatFlags) {
 +    if (this.statsDisabled) {
 +      return new DummyStatisticsImpl(type, textId, numericId);
 +    }
 +    long myUniqueId;
 +    synchronized (statsListUniqueIdLock) {
 +      myUniqueId = statsListUniqueId++; // fix for bug 30597
 +    }
 +    Statistics result = new LocalStatisticsImpl(type, textId, numericId, myUniqueId, false, osStatFlags, this);
 +    synchronized (statsList) {
 +      statsList.add(result);
 +      statsListModCount++;
 +    }
 +    return result;
 +  }
 +
 +  public FunctionStats getFunctionStats(String textId) {
 +    FunctionStats stats = (FunctionStats)functionExecutionStatsMap.get(textId);
 +    if (stats == null) {
 +      stats = new FunctionStats(this, textId);
 +      FunctionStats oldStats = functionExecutionStatsMap.putIfAbsent(textId,
 +          stats);
 +      if (oldStats != null) {
 +        stats.close();
 +        stats = oldStats;
 +      }
 +    }
 +    return stats;
 +  }
 +
 +  
 +  public FunctionServiceStats getFunctionServiceStats() {
 +    if (functionServiceStats == null) {
 +      synchronized (this) {
 +        if(functionServiceStats == null){
 +          functionServiceStats = new FunctionServiceStats(this, "FunctionExecution");
 +        }
 +      }
 +    }
 +    return functionServiceStats;
 +  }
 +
 +  /**
 +   * For every registered statistic instance call the specified visitor.
 +   * This method was added to fix bug 40358
 +   */
 +  public void visitStatistics(StatisticsVisitor visitor) {
 +    synchronized (this.statsList) {
 +      for (Statistics s: this.statsList) {
 +        visitor.visit(s);
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Used to "visit" each instance of Statistics registered with
 +   * @see #visitStatistics
 +   */
 +  public interface StatisticsVisitor {
 +    public void visit(Statistics stat);
 +  }
 +  
 +  public Set<String> getAllFunctionExecutionIds() {
 +    return functionExecutionStatsMap.keySet();
 +  }
 +  
 +  
 +  public Statistics[] findStatisticsByType(final StatisticsType type) {
 +    final ArrayList hits = new ArrayList();
 +    visitStatistics(new StatisticsVisitor() {
 +        public void visit(Statistics s) {
 +          if (type == s.getType()) {
 +            hits.add(s);
 +          }
 +        }
 +      });
 +    Statistics[] result = new Statistics[hits.size()];
 +    return (Statistics[])hits.toArray(result);
 +  }
 +  
 +  public Statistics[] findStatisticsByTextId(final String textId) {
 +    final ArrayList hits = new ArrayList();
 +    visitStatistics(new StatisticsVisitor() {
 +        public void visit(Statistics s) {
 +          if (s.getTextId().equals(textId)) {
 +            hits.add(s);
 +          }
 +        }
 +      });
 +    Statistics[] result = new Statistics[hits.size()];
 +    return (Statistics[])hits.toArray(result);
 +  }
 +  public Statistics[] findStatisticsByNumericId(final long numericId) {
 +    final ArrayList hits = new ArrayList();
 +    visitStatistics(new StatisticsVisitor() {
 +        public void visit(Statistics s) {
 +          if (numericId == s.getNumericId()) {
 +            hits.add(s);
 +          }
 +        }
 +      });
 +    Statistics[] result = new Statistics[hits.size()];
 +    return (Statistics[])hits.toArray(result);
 +  }
 +  public Statistics findStatisticsByUniqueId(final long uniqueId) {
 +    synchronized (this.statsList) {
 +      for (Statistics s: this.statsList) {
 +        if (uniqueId == s.getUniqueId()) {
 +          return s;
 +        }
 +      }
 +    }
 +    return null;
 +  }
 +
 +  /** for internal use only. Its called by {@link LocalStatisticsImpl#close}. */
 +  public void destroyStatistics(Statistics stats) {
 +    synchronized (statsList) {
 +      if (statsList.remove(stats)) {
 +        statsListModCount++;
 +      }
 +    }
 +  }
 +
 +  public Statistics createAtomicStatistics(StatisticsType type) {
 +    return createAtomicStatistics(type, null, 0);
 +  }
 +  public Statistics createAtomicStatistics(StatisticsType type, String textId) {
 +    return createAtomicStatistics(type, textId, 0);
 +  }
 +  public Statistics createAtomicStatistics(StatisticsType type, String textId, long numericId) {
 +    if (this.statsDisabled) {
 +      return new DummyStatisticsImpl(type, textId, numericId);
 +    }
 +    
 +    long myUniqueId;
 +    synchronized (statsListUniqueIdLock) {
 +      myUniqueId = statsListUniqueId++; // fix for bug 30597
 +    }
 +    Statistics result = StatisticsImpl.createAtomicNoOS(type, textId, numericId, myUniqueId, this);
 +    synchronized (statsList) {
 +      statsList.add(result);
 +      statsListModCount++;
 +    }
 +    return result;
 +  }
 +
 +
 +  // StatisticsTypeFactory methods
 +  private final static StatisticsTypeFactory tf = StatisticsTypeFactoryImpl.singleton();
 +
 +  /**
 +   * Creates or finds a StatisticType for the given shared class.
 +   */
 +  public StatisticsType createType(String name, String description,
 +                                   StatisticDescriptor[] stats) {
 +    return tf.createType(name, description, stats);
 +  }
 +  public StatisticsType findType(String name) {
 +    return tf.findType(name);
 +  }
 +  public StatisticsType[] createTypesFromXml(Reader reader)
 +    throws IOException {
 +    return tf.createTypesFromXml(reader);
 +  }
 +
 +  public StatisticDescriptor createIntCounter(String name, String description,
 +                                              String units) {
 +    return tf.createIntCounter(name, description, units);
 +  }
 +  public StatisticDescriptor createLongCounter(String name, String description,
 +                                               String units) {
 +    return tf.createLongCounter(name, description, units);
 +  }
 +  public StatisticDescriptor createDoubleCounter(String name, String description,
 +                                                 String units) {
 +    return tf.createDoubleCounter(name, description, units);
 +  }
 +  public StatisticDescriptor createIntGauge(String name, String description,
 +                                            String units) {
 +    return tf.createIntGauge(name, description, units);
 +  }
 +  public StatisticDescriptor createLongGauge(String name, String description,
 +                                             String units) {
 +    return tf.createLongGauge(name, description, units);
 +  }
 +  public StatisticDescriptor createDoubleGauge(String name, String description,
 +                                               String units) {
 +    return tf.createDoubleGauge(name, description, units);
 +  }
 +  public StatisticDescriptor createIntCounter(String name, String description,
 +                                              String units, boolean largerBetter) {
 +    return tf.createIntCounter(name, description, units, largerBetter);
 +  }
 +  public StatisticDescriptor createLongCounter(String name, String description,
 +                                               String units, boolean largerBetter) {
 +    return tf.createLongCounter(name, description, units, largerBetter);
 +  }
 +  public StatisticDescriptor createDoubleCounter(String name, String description,
 +                                                 String units, boolean largerBetter) {
 +    return tf.createDoubleCounter(name, description, units, largerBetter);
 +  }
 +  public StatisticDescriptor createIntGauge(String name, String description,
 +                                            String units, boolean largerBetter) {
 +    return tf.createIntGauge(name, description, units, largerBetter);
 +  }
 +  public StatisticDescriptor createLongGauge(String name, String description,
 +                                             String units, boolean largerBetter) {
 +    return tf.createLongGauge(name, description, units, largerBetter);
 +  }
 +  public StatisticDescriptor createDoubleGauge(String name, String description,
 +                                               String units, boolean largerBetter) {
 +    return tf.createDoubleGauge(name, description, units, largerBetter);
 +  }
 +
 +  public long getStartTime() {
 +    return this.startTime;
 +  }
 +
 +  /**
 +   * Makes note of a <code>ConnectListener</code> whose
 +   * <code>onConnect</code> method will be invoked when a connection is
 +   * created to a distributed system.
 +   * @return set of currently existing system connections
 +   */
 +  public static List addConnectListener(ConnectListener listener) {
 +    synchronized (existingSystemsLock) {
 +      synchronized (connectListeners) {
 +        connectListeners.add(listener);
 +        return existingSystems;
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Makes note of a <code>ReconnectListener</code> whose
 +   * <code>onReconnect</code> method will be invoked when a connection is
 +   * recreated to a distributed system during auto-reconnect.<p>
 +   * 
 +   * The ReconnectListener set is cleared after a disconnect.
 +   */
 +  public static void addReconnectListener(ReconnectListener listener) {
 +//    (new ManagerLogWriter(LogWriterImpl.FINE_LEVEL, System.out)).fine("registering reconnect listener: " + listener);
 +    synchronized (existingSystemsLock) {
 +      synchronized (reconnectListeners) {
 +        reconnectListeners.add(listener);
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Removes a <code>ConnectListener</code> from the list of
 +   * listeners that will be notified when a connection is created to
 +   * a distributed system.
 +   * @return true if listener was in the list
 +   */
 +  public static boolean removeConnectListener(ConnectListener listener) {
 +    synchronized (connectListeners) {
 +      return connectListeners.remove(listener);
 +    }
 +  }
 +
 +  /**
 +   * Notifies all registered <code>ConnectListener</code>s that a
 +   * connection to a distributed system has been created.
 +   */
 +  private static void notifyConnectListeners(InternalDistributedSystem sys) {
 +    synchronized (connectListeners) {
 +      for (Iterator iter = connectListeners.iterator(); iter.hasNext();) {
 +        try {
 +          ConnectListener listener = (ConnectListener) iter.next();
 +          listener.onConnect(sys);
 +        }
 +        catch (VirtualMachineError err) {
 +          SystemFailure.initiateFailure(err);
 +          // If this ever returns, rethrow the error.  We're poisoned
 +          // now, so don't let this thread continue.
 +          throw err;
 +        }
 +        catch (Throwable t) {
 +          // Whenever you catch Error or Throwable, you must also
 +          // catch VirtualMachineError (see above).  However, there is
 +          // _still_ a possibility that you are dealing with a cascading
 +          // error condition, so you also need to check to see if the JVM
 +          // is still usable:
 +          SystemFailure.checkFailure();
 +          sys.getLogWriter().convertToLogWriterI18n().severe(LocalizedStrings.InternalDistributedSystem_CONNECTLISTENER_THREW, t);
 +        }
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Removes a <code>ReconnectListener</code> from the list of
 +   * listeners that will be notified when a connection is recreated to
 +   * a distributed system.
 +   */
 +  public static void removeReconnectListener(ReconnectListener listener) {
 +    synchronized (reconnectListeners) {
 +      reconnectListeners.remove(listener);
 +    }
 +  }
 +
 +  /**
 +   * Notifies all registered <code>ReconnectListener</code>s that a
 +   * connection to a distributed system has been recreated.
 +   */
 +  private static void notifyReconnectListeners(InternalDistributedSystem oldsys, InternalDistributedSystem newsys, boolean starting) {
 +    List<ReconnectListener> listeners;
 +    synchronized (reconnectListeners) {
 +      listeners = new ArrayList<ReconnectListener>(reconnectListeners);
 +    }
 +    for (ReconnectListener listener: listeners) {
 +      try {
 +        if (starting) {
 +          listener.reconnecting(oldsys);
 +        } else {
 +          listener.onReconnect(oldsys, newsys);
 +        }
 +      } catch (Throwable t) {
 +        Error err;
 +        if (t instanceof OutOfMemoryError || t instanceof UnknownError) {
 +          err = (Error)t;
 +          SystemFailure.initiateFailure(err);
 +          // If this ever returns, rethrow the error. We're poisoned
 +          // now, so don't let this thread continue.
 +          throw err;
 +        }
 +        // Whenever you catch Error or Throwable, you must also
 +        // check for fatal JVM error (see above).  However, there is
 +        // _still_ a possibility that you are dealing with a cascading
 +        // error condition, so you also need to check to see if the JVM
 +        // is still usable:
 +        SystemFailure.checkFailure();
 +        logger.fatal(LocalizedMessage.create(LocalizedStrings.InternalDistributedSystem_CONNECTLISTENER_THREW), t);
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Notifies all resource event listeners. All exceptions are caught here and
 +   * only a warning message is printed in the log
 +   * 
 +   * @param event
 +   *          Enumeration depicting particular resource event
 +   * @param resource
 +   *          the actual resource object.
 +   */
 +  private void notifyResourceEventListeners(ResourceEvent event, Object resource) {
 +    for (Iterator<ResourceEventsListener> iter = resourceListeners.iterator(); iter
 +        .hasNext();) {
 +      try {
 +        ResourceEventsListener listener = (ResourceEventsListener) iter.next();
 +        listener.handleEvent(event, resource);
 +      } catch(CancelException e) {
 +        //ignore
 +      } catch (ManagementException ex) {
 +        if (event == ResourceEvent.CACHE_CREATE) {
 +          throw ex;
 +        } else {
 +          logger.warn(ex.getMessage(), ex);
 +        }
 +      } catch (Exception err) {
 +        logger.warn(err.getMessage(), err);
 +      } catch (VirtualMachineError e) {
 +        SystemFailure.initiateFailure(e);
 +        throw e;
 +      } catch (Throwable t) {
 +        SystemFailure.checkFailure();
 +        logger.warn(t.getMessage(), t);
 +      }
 +    }
 +  }
 +  
 +  /**
 +   * sqlfire's disconnect listener is invoked before the cache is closed when
 +   * there is a forced disconnect
 +   */
 +  public void setSqlfForcedDisconnectListener(DisconnectListener listener) {
 +    synchronized(this.listeners) { 
 +      this.sqlfDisconnectListener = listener;
 +    }
 +  }
 +  
 +  private void notifySqlfForcedDisconnectListener() {
 +    if (this.sqlfDisconnectListener != null) {
 +      final boolean isDebugEnabled = logger.isDebugEnabled();
 +      try {
 +        if (isDebugEnabled) {
 +          logger.debug("notifying sql disconnect listener");
 +        }
 +        this.sqlfDisconnectListener.onDisconnect(this);
 +      } catch (VirtualMachineError e) {
 +        SystemFailure.initiateFailure(e);
 +        throw e;
 +      } catch (Throwable e) {
 +        SystemFailure.checkFailure();
 +        // TODO: should these be logged or ignored?  We need to see them
 +        logger.info("", e);
 +      }
 +      if (isDebugEnabled) {
 +        logger.debug("finished notifying sql disconnect listener");
 +      }
 +    }
 +  }
 +  
 +  
 +
 +  /**
 +   * Makes note of a <code>DisconnectListener</code> whose
 +   * <code>onDisconnect</code> method will be invoked when this
 +   * connection to the distributed system is disconnected.
 +   */
 +  public void addDisconnectListener(DisconnectListener listener) {
 +    synchronized (this.listeners) {
 +      this.listeners.add(listener);
 +
 +      Boolean disconnectListenerThreadBoolean = 
 +        (Boolean) disconnectListenerThread.get();
 +
 +      if (disconnectListenerThreadBoolean == null ||
 +          !disconnectListenerThreadBoolean.booleanValue()) {
 +        // Don't add disconnect listener after messaging has been disabled.
 +        // Do this test _after_ adding the listener to narrow the window.
 +        // It's possible to miss it still and never invoke the listener, but
 +        // other shutdown conditions will presumably get flagged.
 +        String reason = this.stopper.cancelInProgress();
 +        if (reason != null) {
 +          this.listeners.remove(listener); // don't leave in the list!
 +          throw new DistributedSystemDisconnectedException(LocalizedStrings.InternalDistributedSystem_NO_LISTENERS_PERMITTED_AFTER_SHUTDOWN_0.toLocalizedString(reason), dm.getRootCause());
 +        }
 +      }
 +    } // synchronized
 +  }
 +  
 +  /**
 +   * A non-null value of Boolean.TRUE will identify a thread being used to
 +   * execute disconnectListeners. {@link #addDisconnectListener} will
 +   * not throw ShutdownException if the value is Boolean.TRUE.
 +   */
 +  final ThreadLocal disconnectListenerThread = new ThreadLocal();
 +
 +  /**
 +   * Removes a <code>DisconnectListener</code> from the list of
 +   * listeners that will be notified when this connection to the
 +   * distributed system is disconnected.
 +   * @return true if listener was in the list
 +   */
 +  public boolean removeDisconnectListener(DisconnectListener listener) {
 +    synchronized (this.listeners) {
 +      return this.listeners.remove(listener);
 +    }
 +  }
 +
 +  /**
 +   * Returns any existing <code>InternalDistributedSystem</code> instance.
 +   * Returns <code>null</code> if no instance exists.
 +   */
 +  public static InternalDistributedSystem getAnyInstance() {
 +    List l = existingSystems;
 +    if (l.isEmpty()) {
 +      return null;
 +    }
 +    else {
 +      return (InternalDistributedSystem)l.get(0);
 +    }
 +  }
 +  /**
 +   * Test hook
 +   */
 +  public static List getExistingSystems() {
 +    return existingSystems;
 +  }
 +
 +  @Override
 +  public Properties getProperties() {
 +    return this.config.toProperties();
 +  }
 +  
 +  @Override
 +  public Properties getSecurityProperties() {
 +    return this.config.getSecurityProps();
 +  }
 +
 +  /**
 +   * Fires an "informational" <code>SystemMembershipEvent</code> in
 +   * admin VMs.
 +   *
 +   * @since 4.0
 +   */
 +  public void fireInfoEvent(Object callback) {
 +    throw new UnsupportedOperationException(LocalizedStrings.InternalDistributedSystem_NOT_IMPLEMENTED_YET.toLocalizedString());
 +  }
 +
 +  /**
 +   * Installs a shutdown hook to ensure
 +   * that we are disconnected if an application VM shuts down
 +   * without first calling disconnect itself.
 +     */
 +  public static final Thread shutdownHook;
 +
 +  static {
 +    // Create a shutdown hook to cleanly close connection if
 +    // VM shuts down with an open connection.
 +    ThreadGroup tg = LoggingThreadGroup.createThreadGroup(SHUTDOWN_HOOK_NAME);
 +    Thread tmp_shutdownHook = null;
 +    try {
 +      //Added for bug 38407
 +      if( ! Boolean.getBoolean(DISABLE_SHUTDOWN_HOOK_PROPERTY)) {
 +        tmp_shutdownHook = new Thread(tg, new Runnable() {
 +          public void run() {
 +            DistributedSystem ds = InternalDistributedSystem.getAnyInstance();
 +            setThreadsSocketPolicy(true /* conserve sockets */);
 +            if (ds != null && ds.isConnected()) {
 +              LogWriterI18n log = ((InternalDistributedSystem)ds).getInternalLogWriter();
 +              log.info(LocalizedStrings.InternalDistributedSystem_shutdownHook_shuttingdown);
 +              DurableClientAttributes dca = ((InternalDistributedSystem)ds).getDistributedMember().getDurableClientAttributes();
 +              boolean isDurableClient = false;
 +              
 +              if(dca != null) {
 +                  isDurableClient = ((dca.getId() == null || dca.getId().isEmpty()) ? false : true);
 +              }
 +                            	 
 +              ((InternalDistributedSystem)ds).disconnect(false,
 +                  LocalizedStrings.InternalDistributedSystem_NORMAL_DISCONNECT
 +                      .toLocalizedString(), isDurableClient/*keep alive drive from this*/);
 +              // this was how we wanted to do it for 5.7, but there were shutdown
 +              // issues in PR/dlock (see bug 39287)
 +//              InternalDistributedSystem ids = (InternalDistributedSystem)ds;
 +//              if (ids.getDistributionManager() != null &&
 +//                  ids.getDistributionManager().getMembershipManager() != null) {
 +//                ids.getDistributionManager().getMembershipManager()
 +//                  .uncleanShutdown("VM is exiting", null);
 +//              }
 +            }
 +          }
 +        }, SHUTDOWN_HOOK_NAME);
 +        Runtime.getRuntime().addShutdownHook(tmp_shutdownHook);
 +      }
 +    } finally {
 +      shutdownHook = tmp_shutdownHook;
 +    }
 +  }
 +  ///////////////////////  Inner Classes  ///////////////////////
 +
 +  /**
 +   * A listener that gets invoked before this connection to the
 +   * distributed system is disconnected.
 +   */
 +  public interface DisconnectListener {
 +
 +    /**
 +     * Invoked before a connection to the distributed system is
 +     * disconnected.
 +     *
 +     * @param sys the the system we are disconnecting from
 +     * process should take before returning.
 +     */
 +    public void onDisconnect(InternalDistributedSystem sys);
 +
 +  }
 +  
 +  /**
 +   * A listener that gets invoked before and after a successful auto-reconnect
 +   */
 +  public interface ReconnectListener {
 +    /**
 +     * Invoked when reconnect attempts are initiated
 +     * 
 +     * @param oldSystem the old DS, which is in a partially disconnected state
 +     * and cannot be used for messaging
 +     */
 +    public void reconnecting(InternalDistributedSystem oldSystem);
 +    
 +    /**
 +     * Invoked after a reconnect to the distributed system
 +     * @param oldSystem the old DS
 +     * @param newSystem the new DS
 +     */
 +    public void onReconnect(InternalDistributedSystem oldSystem, InternalDistributedSystem newSystem);
 +  }
 +
 +  /**
 +   * A listener that gets invoked after this connection to the
 +   * distributed system is disconnected
 +   * @author jpenney
 +   *
 +   */
 +  public interface ShutdownListener extends DisconnectListener {
 +    /**
 +     * Invoked after the connection to the distributed system has
 +     * been disconnected
 +     * @param sys
 +     */
 +    public void onShutdown(InternalDistributedSystem sys);
 +  }
 +
 +  /**
 +   * Integer representing number of tries already made
 +   * to reconnect and that failed.
 +   * */
 +  private volatile static int reconnectAttemptCounter = 0;
 +  public static int getReconnectAttemptCounter() {
 +    return reconnectAttemptCounter;
 +  }
 +  
 +  /**
 +   * The time at which reconnect attempts last began
 +   */
 +  private static long reconnectAttemptTime;
 +
 +  /**
 +   * Boolean indicating if DS needs to reconnect and reconnect
 +   * is in progress.
 +   * */
 +  private volatile boolean attemptingToReconnect = false;
 +  
 +  /**
 +   * Boolean indicating this DS joined through a reconnect attempt
 +   */
 +  private volatile boolean reconnected = false;
 +  
 +  /**
 +   * Boolean indicating that this member has been shunned by other members
 +   * or a network partition has occurred
 +   */
 +  private volatile boolean forcedDisconnect = false;
 +
 +  /**
 +   * Used to keep track of the DS created by doing an reconnect on this.
 +   */
 +  private volatile InternalDistributedSystem reconnectDS;
 +  /**
 +   * Was this distributed system started with FORCE_LOCATOR_DM_TYPE=true?
 +   * We need to know when reconnecting.
 +   */
 +  private boolean locatorDMTypeForced;
 +  
 +  
 +  /**
 +   * Returns true if we are reconnecting the distributed system or
 +   * reconnect has completed.  If this returns true it means that
 +   * this instance of the DS is now disconnected and unusable.
 +   */
 +  public boolean isReconnecting(){
 +    return attemptingToReconnect || (reconnectDS != null);
 +  }
 +  
 +  /**
 +   * Returns true if we are reconnecting the distributed system
 +   * and this instance was created for one of the connection
 +   * attempts.  If the connection succeeds this state is cleared
 +   * and this method will commence to return false.
 +   */
 +  public boolean isReconnectingDS() {
 +    return this.isReconnectingDS;
 +  }
 +  
 +  /**
 +   * returns the membership socket of the old
 +   * distributed system, if available, when
 +   * isReconnectingDS returns true.  This is
 +   * used to connect the new DM to the distributed
 +   * system through RemoteTransportConfig.
 +   */
 +  public Object oldDSMembershipInfo() {
 +    if (this.quorumChecker != null) {
 +      return this.quorumChecker.getMembershipInfo();
 +    }
 +    return null;
 +  }
 +  
 +  /**
 +   * Returns true if this DS reconnected to the distributed system after
 +   * a forced disconnect or loss of required-roles
 +   */
 +  public boolean reconnected() {
 +    return this.reconnected;
 +  }
 +  
 +  /**
 +   * Returns true if this DS has been kicked out of the distributed system
 +   */
 +  public boolean forcedDisconnect() {
 +    return this.forcedDisconnect;
 +  }
 +
 +  /**
 +   * If true then this DS will never reconnect.
 +   */
 +  private boolean reconnectCancelled = false;
 +  private Object reconnectCancelledLock = new Object();
 +
 +  /** Make sure this instance of DS never does a reconnect.
 +   * Also if reconnect is in progress cancel it.
 +   */
 +  public void cancelReconnect() {
 +//    (new ManagerLogWriter(LogWriterImpl.FINE_LEVEL, System.out)).fine("cancelReconnect invoked", new Exception("stack trace"));
 +    synchronized(this.reconnectCancelledLock) {
 +      this.reconnectCancelled = t

<TRUNCATED>