You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/22 19:36:41 UTC

[79/83] [abbrv] incubator-geode git commit: GEODE-917: Merge branch 'feature/GEODE-917' into develop

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java
index 0000000,51bfb53..24b0697
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java
@@@ -1,0 -1,4811 +1,4811 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.distributed.internal;
+ 
+ import java.io.Externalizable;
+ import java.io.IOException;
+ import java.io.NotSerializableException;
+ import java.io.ObjectInput;
+ import java.io.ObjectOutput;
+ import java.io.Serializable;
+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Date;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.NoSuchElementException;
+ import java.util.Set;
+ import java.util.concurrent.BlockingQueue;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.Executor;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.Semaphore;
+ import java.util.concurrent.SynchronousQueue;
+ import java.util.concurrent.ThreadFactory;
+ import java.util.concurrent.ThreadPoolExecutor;
+ import java.util.concurrent.TimeUnit;
+ 
+ import org.apache.logging.log4j.Logger;
+ 
+ import com.gemstone.gemfire.CancelCriterion;
+ import com.gemstone.gemfire.CancelException;
+ import com.gemstone.gemfire.ForcedDisconnectException;
+ import com.gemstone.gemfire.IncompatibleSystemException;
+ import com.gemstone.gemfire.InternalGemFireError;
+ import com.gemstone.gemfire.InternalGemFireException;
+ import com.gemstone.gemfire.InvalidDeltaException;
+ import com.gemstone.gemfire.SystemConnectException;
+ import com.gemstone.gemfire.SystemFailure;
+ import com.gemstone.gemfire.ToDataException;
+ import com.gemstone.gemfire.admin.GemFireHealthConfig;
+ import com.gemstone.gemfire.distributed.DistributedMember;
+ import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
+ import com.gemstone.gemfire.distributed.Locator;
+ import com.gemstone.gemfire.distributed.Role;
+ import com.gemstone.gemfire.distributed.internal.locks.ElderState;
+ import com.gemstone.gemfire.distributed.internal.membership.DistributedMembershipListener;
+ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+ import com.gemstone.gemfire.distributed.internal.membership.MemberFactory;
+ import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
+ import com.gemstone.gemfire.distributed.internal.membership.NetView;
+ import com.gemstone.gemfire.i18n.StringId;
+ import com.gemstone.gemfire.internal.Assert;
+ import com.gemstone.gemfire.internal.NanoTimer;
+ import com.gemstone.gemfire.internal.OSProcess;
+ import com.gemstone.gemfire.internal.SetUtils;
+ import com.gemstone.gemfire.internal.SocketCreator;
+ import com.gemstone.gemfire.internal.Version;
+ import com.gemstone.gemfire.internal.admin.remote.AdminConsoleDisconnectMessage;
+ import com.gemstone.gemfire.internal.admin.remote.RemoteGfManagerAgent;
+ import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig;
+ import com.gemstone.gemfire.internal.cache.InitialImageOperation;
+ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+ import com.gemstone.gemfire.internal.logging.LogService;
+ import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
+ import com.gemstone.gemfire.internal.logging.log4j.AlertAppender;
+ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+ import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+ import com.gemstone.gemfire.internal.sequencelog.MembershipLogger;
+ import com.gemstone.gemfire.internal.tcp.Connection;
+ import com.gemstone.gemfire.internal.tcp.ConnectionTable;
+ import com.gemstone.gemfire.internal.tcp.ReenteredConnectException;
+ import com.gemstone.gemfire.internal.util.concurrent.StoppableReentrantLock;
+ 
+ /**
+  * The <code>DistributionManager</code> uses a {@link
+  * MembershipManager} to distribute {@link DistributionMessage messages}
+  * queued in {@link MQueue}s.
+  *
+  * <P>
+  *
+  * Code that wishes to send a {@link DistributionMessage} must get
+  * the <code>DistributionManager</code> and invoke {@link
+  * #putOutgoing}.
+  *
+  * <P>
+  *
+  * <code>DistributionManager</code> is not intended to be
+  * serialized.  It is <code>Externalizable</code> only to prevent it
+  * from being copy shared.  See {@link #writeExternal}.
+  *
+  * <P>
+  *
+  * Prior to GemFire 4.0, <code>DistributionManager</code> was an
+  * abstract class with two concrete subclasses,
+  * <code>LocalDistributionManager</code> and
+  * <code>ConsoleDistributionManager</code>.  We decided that
+  * <code>ConsoleDistributionManager</code> (which was used for the GUI
+  * console and admin APIs) did not offer enough interesting
+  * functionality to warrant a separate class.  More importantly, it
+  * prevented the Cache and admin APIs from being used in the same VM.
+  * So, we refactored the code of those two subclasses into
+  * <code>DistributionManager</code>.
+  *
+  * @author David Whitlock
+  * @since 2.0
+  *
+  * @see DistributionMessage#process
+  * @see IgnoredByManager
+  */
+ public class DistributionManager
+   implements Externalizable, DM {
+ 
+   private static final Logger logger = LogService.getLogger();
+   
+   private static final boolean SYNC_EVENTS =
+     Boolean.getBoolean("DistributionManager.syncEvents");
+ 
+   /**
+    * WARNING: setting this to true may break dunit tests.
+    * <p>see com.gemstone.gemfire.cache30.ClearMultiVmCallBkDUnitTest
+    */
+   public static final boolean INLINE_PROCESS = 
+     !Boolean.getBoolean("DistributionManager.enqueueOrderedMessages");
+ 
+   /** Flag indicating whether to use single Serial-Executor thread or 
+    * Multiple Serial-executor thread, 
+    */
+   public static final boolean MULTI_SERIAL_EXECUTORS = 
+     !Boolean.getBoolean("DistributionManager.singleSerialExecutor");
+ 
+   /** The name of the distribution manager (identifies it in GemFire) */
+   public static final String NAME = "GemFire";
+   
+   /** The number of milliseconds to wait for distribution-related
+    * things to happen */
+   public static final long TIMEOUT =
+     Long.getLong("DistributionManager.TIMEOUT", -1).longValue();
+ 
+   public static final int PUSHER_THREADS =
+     Integer.getInteger("DistributionManager.PUSHER_THREADS", 50).intValue();
+ 
+   public static final int PUSHER_QUEUE_SIZE =
+     Integer.getInteger("DistributionManager.PUSHER_QUEUE_SIZE", 4096).intValue();
+ 
+   
+   public static final int MAX_WAITING_THREADS = 
+     Integer.getInteger("DistributionManager.MAX_WAITING_THREADS", Integer.MAX_VALUE).intValue();
+   
+   public static final int MAX_PR_META_DATA_CLEANUP_THREADS = 
+       Integer.getInteger("DistributionManager.MAX_PR_META_DATA_CLEANUP_THREADS", 1).intValue();
+ 
+   public static final int MAX_THREADS = Integer.getInteger("DistributionManager.MAX_THREADS", 100).intValue();
+   public static final int MAX_PR_THREADS = Integer.getInteger("DistributionManager.MAX_PR_THREADS", Math.max(Runtime.getRuntime().availableProcessors()*4, 16)).intValue();
+   public static final int MAX_FE_THREADS = Integer.getInteger("DistributionManager.MAX_FE_THREADS", Math.max(Runtime.getRuntime().availableProcessors()*4, 16)).intValue();
+   //    Integer.getInteger("DistributionManager.MAX_THREADS", max(Runtime.getRuntime().availableProcessors()*2, 2)).intValue();
+ 
+   public static final int INCOMING_QUEUE_LIMIT =
+     Integer.getInteger("DistributionManager.INCOMING_QUEUE_LIMIT", 80000).intValue();
+   public static final int INCOMING_QUEUE_THROTTLE =
+     Integer.getInteger("DistributionManager.INCOMING_QUEUE_THROTTLE", (int)(INCOMING_QUEUE_LIMIT * 0.75)).intValue();
+ 
+   /** Throttling based on the Queue byte size */
+   public static final double THROTTLE_PERCENT =
+     (double) (Integer.getInteger("DistributionManager.SERIAL_QUEUE_THROTTLE_PERCENT", 75).intValue())/100;
+   public static final int SERIAL_QUEUE_BYTE_LIMIT =
+     Integer.getInteger("DistributionManager.SERIAL_QUEUE_BYTE_LIMIT", (40 * (1024 * 1024))).intValue();
+   public static final int SERIAL_QUEUE_THROTTLE =
+     Integer.getInteger("DistributionManager.SERIAL_QUEUE_THROTTLE", (int)(SERIAL_QUEUE_BYTE_LIMIT * THROTTLE_PERCENT)).intValue();
+   public static final int TOTAL_SERIAL_QUEUE_BYTE_LIMIT =
+     Integer.getInteger("DistributionManager.TOTAL_SERIAL_QUEUE_BYTE_LIMIT", (80 * (1024 * 1024))).intValue();
+   public static final int TOTAL_SERIAL_QUEUE_THROTTLE =
+     Integer.getInteger("DistributionManager.TOTAL_SERIAL_QUEUE_THROTTLE", (int)(SERIAL_QUEUE_BYTE_LIMIT * THROTTLE_PERCENT)).intValue();
+   
+   /** Throttling based on the Queue item size */
+   public static final int SERIAL_QUEUE_SIZE_LIMIT =
+     Integer.getInteger("DistributionManager.SERIAL_QUEUE_SIZE_LIMIT", 20000).intValue();
+   public static final int SERIAL_QUEUE_SIZE_THROTTLE =
+     Integer.getInteger("DistributionManager.SERIAL_QUEUE_SIZE_THROTTLE", (int)(SERIAL_QUEUE_SIZE_LIMIT * THROTTLE_PERCENT)).intValue();
+ 
+   /** Max number of serial Queue executors, in case of multi-serial-queue executor */
+   public static final int MAX_SERIAL_QUEUE_THREAD =
+     Integer.getInteger("DistributionManager.MAX_SERIAL_QUEUE_THREAD", 20).intValue();
+ 
+   /**
+    * Whether or not to include link local addresses in the list of addresses we use
+    * to determine if two members are no the same host.
+    * 
+    * Added for normura issue 7033 - they have duplicate link local addresses on different boxes
+    */
+   public static volatile boolean INCLUDE_LINK_LOCAL_ADDRESSES = 
+     Boolean.getBoolean("gemfire.IncludeLinkLocalAddresses");
+   
+   /** The DM type for regular distribution managers */
+   public static final int NORMAL_DM_TYPE = 10;
+ 
+   /** The DM type for locator distribution managers 
+    * @since 7.0
+    */
+   public static final int LOCATOR_DM_TYPE = 11;
+ 
+   /** The DM type for Console (admin-only) distribution managers */
+   public static final int ADMIN_ONLY_DM_TYPE = 12;
+ 
+   public static final int LONER_DM_TYPE = 13;
+ 
+   /**
+    * an NIO priority type
+    * @see com.gemstone.gemfire.distributed.internal.PooledDistributionMessage
+    * @see #SERIAL_EXECUTOR
+    * @see #HIGH_PRIORITY_EXECUTOR
+    * @see #WAITING_POOL_EXECUTOR
+    */
+   public static final int STANDARD_EXECUTOR = 73;
+ 
+   /**
+    * an NIO priority type
+    * 
+    * @see com.gemstone.gemfire.distributed.internal.SerialDistributionMessage
+    * @see #STANDARD_EXECUTOR
+    */
+   public static final int SERIAL_EXECUTOR = 74;
+ 
+   /**
+    * an NIO priority type
+ 
+    * @see com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage
+    * @see #STANDARD_EXECUTOR
+    */
+   public static final int HIGH_PRIORITY_EXECUTOR = 75;
+   
+   // 76 not in use
+ 
+   /**
+    * an NIO priority type
+    * 
+    * @see com.gemstone.gemfire.internal.cache.InitialImageOperation
+    * @see #STANDARD_EXECUTOR
+    */
+   public static final int WAITING_POOL_EXECUTOR = 77;
+ 
+   /**
+    * an NIO priority type
+    * 
+    * @see com.gemstone.gemfire.internal.cache.InitialImageOperation
+    * @see #STANDARD_EXECUTOR
+    */
+   public static final int PARTITIONED_REGION_EXECUTOR = 78;
+ 
+   
+   /**
+    * Executor for view related messages
+    * 
+    * @see com.gemstone.gemfire.distributed.internal.membership.gms.messages.ViewAckMessage
+    * @see #STANDARD_EXECUTOR
+    */
+   public static final int VIEW_EXECUTOR = 79;
+ 
+ 
+   public static final int REGION_FUNCTION_EXECUTION_EXECUTOR = 80;
+ 
+   /** The number of open  distribution managers in this VM */
+   private static int openDMs = 0;
+ 
+ //  /** The stack trace of the last time a console DM was opened */
+ //  private static Exception openStackTrace;
+ 
+   /** Is this VM dedicated to administration (like a GUI console or a
+    * JMX agent)?  If so, then it creates {@link #ADMIN_ONLY_DM_TYPE}
+    * type distribution managers.
+    *
+    * @since 4.0 
+    */
+   public static volatile boolean isDedicatedAdminVM = false;
+   
+   /**
+    * Is this admin agent used for a command line console.
+    * This flag controls whether connect will throw 
+    * an exception or just wait for a DS if one is not
+    * available. If true, we will throw an exception.
+    * 
+    */
+   public static volatile boolean isCommandLineAdminVM = false;
+   
+   
+   
+   /////////////////////  Instance Fields  //////////////////////
+ 
+   /** The id of this distribution manager */
+   final protected InternalDistributedMember myid;
+ 
+   /** The distribution manager type of this dm; set in its constructor. */
+   private final int dmType;
+ 
+   /** The <code>MembershipListener</code>s that are registered on this
+    * manager. */
+   private final ConcurrentMap membershipListeners;
+ 
+   /** A lock to hold while adding and removing membership listeners */
+   protected final Object membershipListenersLock =
+     new MembershipListenersLock();
+   /** The <code>MembershipListener</code>s that are registered on this
+    * manager for ALL members.
+    * @since 5.7
+    */
+   protected volatile Set allMembershipListeners = Collections.EMPTY_SET;
+   /**
+    * A lock to hold while adding and removing all membership listeners.
+    * @since 5.7
+    */
+   protected final Object allMembershipListenersLock =
+     new MembershipListenersLock();
+   /** A queue of MemberEvent instances */
+   protected final BlockingQueue membershipEventQueue =
+     new LinkedBlockingQueue();
+   /** Used to invoke registered membership listeners in the background. */
+   private Thread memberEventThread;
+ 
+ 
+   /** A brief description of this DistributionManager */
+   protected final String description;
+ 
+   /** Statistics about distribution */
+   protected /*final*/ DistributionStats stats;
+ 
+   /** Did an exception occur in one of the DM threads? */
+   protected boolean exceptionInThreads;
+ 
+   static ThreadLocal isStartupThread = new ThreadLocal();
+     
+   protected volatile boolean shutdownMsgSent = false;
+ 
+   /** Set to true when this manager is being shutdown */
+   protected volatile boolean closeInProgress = false;
+   
+   private volatile boolean receivedStartupResponse = false;
+   
+   private volatile String rejectionMessage = null;
+ 
+   protected MembershipManager membershipManager;
+   
+   /** The channel through which distributed communication occurs. */
+   protected DistributionChannel channel;
+ 
+   /**
+    * The (non-admin-only) members of the distributed system.  This is a
+    * map of memberid->memberid for fast access to canonical ID references.
+    * All accesses to this 
+    * field must be synchronized on {@link #membersLock}.
+    */
+   private Map<InternalDistributedMember,InternalDistributedMember> members = Collections.emptyMap();
+   /** 
+    * All (admin and non-admin) members of the distributed system. All accesses 
+    * to this field must be synchronized on {@link #membersLock}.
+    */
+   private Set membersAndAdmin = Collections.emptySet();
+   /** 
+    * Map of all locator members of the distributed system. The value is a
+    * collection of locator strings that are hosted in that member. All accesses 
+    * to this field must be synchronized on {@link #membersLock}.
+    */
+   private Map<InternalDistributedMember, Collection<String>> hostedLocatorsAll = Collections.emptyMap();
+   
+   /** 
+    * Map of all locator members of the distributed system which have the shared configuration. The value is a
+    * collection of locator strings that are hosted in that member. All accesses 
+    * to this field must be synchronized on {@link #membersLock}.
+    */
+   private Map<InternalDistributedMember, Collection<String>> hostedLocatorsWithSharedConfiguration = Collections.emptyMap();
+ 
+   /**
+    * Since 6.6.2 and hereafter we will save the versions here. But pre-6.6.2's
+    * StartupResponseMessage does not contain version. We will assign a default
+    * version for them.
+    */
+   public static final String DEFAULT_VERSION_PRE_6_6_2 = "6.6.0.0";
+   /** 
+    * The lock held while accessing the field references to the following:<br>
+    * 1) {@link #members}<br>
+    * 2) {@link #membersAndAdmin}<br>
+    * 3) {@link #hostedLocatorsAll}<br>
+    * 4) {@link #hostedLocatorsWithSharedConfiguration}<br>
+    */
+   private final Object membersLock = new MembersLock();
+ 
+   /**
+    * The lock held while writing {@link #adminConsoles}.
+    */
+   private final Object adminConsolesLock = new Object();
+   /** 
+    * The ids of all known admin consoles
+    * Uses Copy on Write. Writers must sync on adminConsolesLock.
+    * Readers don't need to sync.
+    */
+   private volatile Set<InternalDistributedMember> adminConsoles = Collections.emptySet();
+ 
+   /** The pusher thread */
+   //private Thread pusher;
+ 
+   /** The group of distribution manager threads */
+   protected LoggingThreadGroup threadGroup;
+   
+   /** Message processing thread pool */
+   private ThreadPoolExecutor threadPool;
+ 
+   /** High Priority processing thread pool, used for initializing messages
+    *  such as UpdateAttributes and CreateRegion messages
+    */
+   private ThreadPoolExecutor highPriorityPool;
+   
+   /** Waiting Pool, used for messages that may have to wait on something.
+    *  Use this separate pool with an unbounded queue so that waiting
+    *  runnables don't get in the way of other processing threads.
+    *  Used for threads that will most likely have to wait for a region to be
+    *  finished initializing before it can proceed
+    */
+   private ThreadPoolExecutor waitingPool;
+   
+   private ThreadPoolExecutor prMetaDataCleanupThreadPool;
+   
+   /**
+    * Thread used to decouple {@link com.gemstone.gemfire.internal.cache.partitioned.PartitionMessage}s from 
+    * {@link com.gemstone.gemfire.internal.cache.DistributedCacheOperation}s </b>
+    * @see #SERIAL_EXECUTOR
+    */
+   private ThreadPoolExecutor partitionedRegionThread;
+   private ThreadPoolExecutor partitionedRegionPool;
+   private ThreadPoolExecutor functionExecutionThread;
+   private ThreadPoolExecutor functionExecutionPool;
+ 
+   /** Message processing executor for serial, ordered, messages. */
+   private ThreadPoolExecutor serialThread;
+   
+   /** Message processing executor for view messages
+    * @see com.gemstone.gemfire.distributed.internal.membership.gms.messages.ViewAckMessage 
+    */
+   private ThreadPoolExecutor viewThread;
+   
+   /** If using a throttling queue for the serialThread, we cache the queue here
+       so we can see if delivery would block */
+   private ThrottlingMemLinkedQueueWithDMStats serialQueue;
+ 
+   /** a map keyed on InternalDistributedMember, to direct channels to other systems */
+   //protected final Map channelMap = CFactory.createCM();
+ 
+   protected volatile boolean readyForMessages = false;
+ 
+   /**
+    * Set to true once this DM is ready to send messages.
+    * Note that it is always ready to send the startup message.
+    */
+   private volatile boolean readyToSendMsgs = false;
+   private final Object readyToSendMsgsLock = new Object();
+ 
+   /** Is this distribution manager closed? */
+   protected volatile boolean closed = false;
+ 
+   /** The distributed system to which this distribution manager is
+    * connected. */
+   private InternalDistributedSystem system;
+ 
+   /** The remote transport configuration for this dm */
+   private RemoteTransportConfig transport;
+ 
+   /** The administration agent associated with this distribution
+    * manager. */
+   private volatile RemoteGfManagerAgent agent;
+ 
+   private SerialQueuedExecutorPool serialQueuedExecutorPool;
+   
+   private final Semaphore parallelGIIs = new Semaphore(InitialImageOperation.MAX_PARALLEL_GIIS);
+ 
+   /**
+    * Map of InetAddress to HashSets of InetAddress, to define equivalences
+    * between network interface cards and hosts.
+    */
+   private final HashMap<InetAddress, Set<InetAddress>> equivalentHosts = new HashMap<InetAddress, Set<InetAddress>>();
+ 
+   private int distributedSystemId = DistributionConfig.DEFAULT_DISTRIBUTED_SYSTEM_ID;
+   
+   
+   private final Map<InternalDistributedMember, String> redundancyZones = Collections.synchronizedMap(new HashMap<InternalDistributedMember, String>());
+   
+   private boolean enforceUniqueZone = false;
+ 
+   private volatile boolean isSharedConfigEnabledForDS = false;
+ 
+   @Override
+   public boolean isSharedConfigurationServiceEnabledForDS() {
+     return isSharedConfigEnabledForDS;
+   }
+ 
+   /**
+    * Identifier for function execution threads and any of their children
+    */
+   public static final InheritableThreadLocal<Boolean> isFunctionExecutionThread = new InheritableThreadLocal<Boolean>() {
+     @Override
+     protected Boolean initialValue() {
+       return Boolean.FALSE;
+     }
+   };
+   //////////////////////  Static Methods  //////////////////////
+ 
+   /**
+    * Given two DistributionManager ids, check to see if they are
+    * from the same host address.
+    * @param id1 a DistributionManager id
+    * @param id2 a DistributionManager id
+    * @return true if id1 and id2 are from the same host, false otherwise
+    */
+   public static boolean isSameHost(InternalDistributedMember id1, InternalDistributedMember id2) {
+     return (id1.getInetAddress().equals(id2.getInetAddress()));
+   }
+ 
+   // @todo davidw Modify JGroups so that we do not have to send out a
+   // {@link StartupMessage}
+   /**
+    * Creates a new distribution manager and discovers the other members of the
+    * distributed system. Note that it does not check to see whether or not this
+    * VM already has a distribution manager.
+    * 
+    * @param system
+    *                The distributed system to which this distribution manager
+    *                will send messages.
+    */
+   public static DistributionManager create(InternalDistributedSystem system)
+   {
+ 
+     DistributionManager dm = null;
+     
+     try {
+ 
+       int vmKind;
+       
+       if (Boolean.getBoolean(InternalLocator.FORCE_LOCATOR_DM_TYPE)) {
+         // if this DM is starting for a locator, set it to be a locator DM
+         vmKind = LOCATOR_DM_TYPE;
+ 
+       } else if (isDedicatedAdminVM) {
+         vmKind = ADMIN_ONLY_DM_TYPE;
+ 
+       } else {
+         vmKind = NORMAL_DM_TYPE;
+       }
+     
+       RemoteTransportConfig transport = new RemoteTransportConfig(system.getConfig(), vmKind);
+       transport.setIsReconnectingDS(system.isReconnectingDS());
+       transport.setOldDSMembershipInfo(system.oldDSMembershipInfo());
+       
+       long start = System.currentTimeMillis();
+ 
+       dm = new DistributionManager(system, transport);
+       dm.assertDistributionManagerType();
+ 
+       {
+         InternalDistributedMember id = dm.getDistributionManagerId();
+         if (!"".equals(id.getName())) {
+           for (InternalDistributedMember m: (List<InternalDistributedMember>)dm.getViewMembers()) {
+             if (m.equals(id)) {
+               // I'm counting on the members returned by getViewMembers being ordered such that
+               // members that joined before us will precede us AND members that join after us
+               // will succeed us.
+               // SO once we find ourself break out of this loop.
+               break;
+             }
+             if (id.getName().equals(m.getName())) {
+               if (dm.getMembershipManager().verifyMember(m, "member is using the name of " + id)) {
+                 throw new IncompatibleSystemException("Member " + id + " could not join this distributed system because the existing member " + m + " used the same name. Set the \"name\" gemfire property to a unique value.");
+               }
+             }
+           }
+         }
+         dm.addNewMember(id); // add ourselves
+         dm.selectElder(); // ShutdownException could be thrown here
+       }
+ 
+       // Send out a StartupMessage to the other members.
+       StartupOperation op = new StartupOperation(dm, transport);
+ 
+       try {
+         if (!dm.sendStartupMessage(op, true)) {
+           // We'll we didn't hear back from anyone else.  We assume that
+           // we're the first one.
+           if (dm.getOtherDistributionManagerIds().size() == 0) {
+             logger.info(LocalizedMessage.create(LocalizedStrings.DistributionManager_DIDNT_HEAR_BACK_FROM_ANY_OTHER_SYSTEM_I_AM_THE_FIRST_ONE));
+           } else if (transport.isMcastEnabled()) {
+             // perform a multicast ping test
+             if (!dm.testMulticast()) {
+               logger.warn(LocalizedMessage.create(
+                   LocalizedStrings.DistributionManager_RECEIVED_NO_STARTUP_RESPONSES_BUT_OTHER_MEMBERS_EXIST_MULTICAST_IS_NOT_RESPONSIVE));
+             }
+           }
+         }
+       } catch (InterruptedException ex) {
+         Thread.currentThread().interrupt();
+         // This is ALWAYS bad; don't consult a CancelCriterion.
+         throw new InternalGemFireException(LocalizedStrings.DistributionManager_INTERRUPTED_WHILE_WAITING_FOR_FIRST_STARTUPRESPONSEMESSAGE.toLocalizedString(), ex);
+       } catch (IncompatibleSystemException ex) {
+         logger.fatal(ex.getMessage(), ex);
+         throw ex;
+       } finally {
+         dm.readyToSendMsgs();
+       }
+ 
+       if (logger.isInfoEnabled()) {
+         long delta = System.currentTimeMillis() - start;
+         Object[] logArgs = new Object[] {
+             dm.getDistributionManagerId(),
+             transport,
+             Integer.valueOf(dm.getOtherDistributionManagerIds().size()),
+             dm.getOtherDistributionManagerIds(), 
+             (logger.isInfoEnabled(LogMarker.DM) ? " (VERBOSE, took " + delta + " ms)" : ""),
+             ((dm.getDMType() == ADMIN_ONLY_DM_TYPE) ? " (admin only)" : (dm.getDMType() == LOCATOR_DM_TYPE) ? " (locator)" : "")
+         };
+         logger.info(LogMarker.DM, LocalizedMessage.create(
+             LocalizedStrings.DistributionManager_DISTRIBUTIONMANAGER_0_STARTED_ON_1_THERE_WERE_2_OTHER_DMS_3_4_5, logArgs));
+ 
+         MembershipLogger.logStartup(dm.getDistributionManagerId());
+       }
+       return dm;
+     }
+     catch (RuntimeException r) {
+       if (dm != null) {
+         if (logger.isDebugEnabled()) {
+           logger.debug("cleaning up incompletely started DistributionManager due to exception", r); 
+         }
+         dm.uncleanShutdown(true);
+       }
+       throw r;
+     }
+   }
+ 
+   void runUntilShutdown(Runnable r) {
+     try {
+       r.run();
+     }
+     catch (CancelException e) {
+       if (logger.isTraceEnabled()) {
+         logger.trace("Caught shutdown exception", e); 
+       }
+     }
+     catch (VirtualMachineError err) {
+       SystemFailure.initiateFailure(err);
+       // If this ever returns, rethrow the error.  We're poisoned
+       // now, so don't let this thread continue.
+       throw err;
+     }
+     catch (Throwable t) {
+       // Whenever you catch Error or Throwable, you must also
+       // catch VirtualMachineError (see above).  However, there is
+       // _still_ a possibility that you are dealing with a cascading
+       // error condition, so you also need to check to see if the JVM
+       // is still usable:
+       SystemFailure.checkFailure();
+       if (isCloseInProgress()) {
+         logger.debug("Caught unusual exception during shutdown: {}", t.getMessage(), t);
+       }
+       else {
+         logger.warn(LocalizedMessage.create(LocalizedStrings.DistributionManager_TASK_FAILED_WITH_EXCEPTION), t);
+       }
+     }  }
+   
+   volatile Throwable rootCause = null;
+   
+   private static class Stopper extends CancelCriterion {
+     private DistributionManager dm;
+     
+     // validateDM is commented out because expiry threads hit it with 
+     // an ugly failure... use only for debugging lingering DM bugs
+ //    private String validateDM() {
+ //      GemFireCache cache = GemFireCache.getInstance();
+ //      if (cache == null) {
+ //        return null; // Distributed system with no cache
+ //      }
+ //      Object obj = cache.getDistributedSystem();
+ //      if (obj == null) {
+ //        return null; // Cache is very dead
+ //      }
+ //      InternalDistributedSystem ids = (InternalDistributedSystem)obj;
+ //      DM current = ids.getDistributionManager();
+ //      if (current != dm) {
+ //        String response = LocalizedStrings.DistributionManager_CURRENT_CACHE_DISTRIBUTIONMANAGER_0_IS_NOT_THE_SAME_AS_1
+ //        .toLocalizedString(new Object[] { current, dm});
+ //        return response; 
+ //      }
+ //      return null;
+ //    }
+     
+     Stopper(DistributionManager dm) {
+       this.dm = dm;  
+     }
+     @Override
+     public String cancelInProgress() {
+       checkFailure();
+ 
+       // remove call to validateDM() to fix bug 38356
+       
+       if (dm.shutdownMsgSent) {
+         return LocalizedStrings.DistributionManager__0_MESSAGE_DISTRIBUTION_HAS_TERMINATED.toLocalizedString(dm.toString());
+       }
+       if (dm.rootCause != null) {
+         return dm.toString() + ": " + dm.rootCause.getMessage();
+       }
+       
+       // Nope.
+       return null;
+     }
+ 
+     @Override
+     public RuntimeException generateCancelledException(Throwable e) {
+       String reason = cancelInProgress();
+       if (reason == null) {
+         return null;
+       }
+       Throwable rc = dm.rootCause; // volatile read
+       if (rc == null) {
+         // No root cause, specify the one given and be done with it.
+         return new DistributedSystemDisconnectedException(reason, e);
+       }
+       
+       if (e == null) {
+         // Caller did not specify  any root cause, so just use our own.
+         return new DistributedSystemDisconnectedException(reason, rc);
+       }
+ 
+       // Attempt to stick rootCause at tail end of the exception chain.
+       Throwable nt = e;
+       while (nt.getCause() != null) {
+         nt = nt.getCause();
+       }
+       if (nt == rc) {
+         // Root cause already in place; we're done
+         return new DistributedSystemDisconnectedException(reason, e);
+       }
+       
+       try {
+         nt.initCause(rc);
+         return new DistributedSystemDisconnectedException(reason, e);
+       }
+       catch (IllegalStateException e2) {
+         // Bug 39496 (Jrockit related)  Give up.  The following
+         // error is not entirely sane but gives the correct general picture.
+         return new DistributedSystemDisconnectedException(reason, rc);
+       }
+     }
+   }
+   private final Stopper stopper = new Stopper(this);
+   
+   public CancelCriterion getCancelCriterion() {
+     return stopper;
+   }
+   
+   ///////////////////////  Constructors  ///////////////////////
+   
+   /**
+    * no-arg constructor for Externalizable
+    * TODO: does this class really need to implement Externalizable?  I
+    * think it only implements that interface for the old copy-sharing
+    * shared-memory stuff that's no longer in GemFire
+    */
+   public DistributionManager() {
+     this.elderLock = null;
+     this.membershipListeners = null;
+     this.myid = null;
+     this.description = null;
+     this.dmType = 0;
+     throw new IllegalAccessError("this constructor should never be invoked");
+   }
+ 
+   /**
+    * Creates a new <code>DistributionManager</code> by initializing
+    * itself, creating the membership manager and executors
+    *
+    * @param transport
+    *        The configuration for the communications transport
+    *
+    */
+   private DistributionManager(RemoteTransportConfig transport,
+                               InternalDistributedSystem system) {
+ 
+     this.dmType = transport.getVmKind();
+     this.system = system;
+     this.elderLock = new StoppableReentrantLock(stopper);
+     this.transport = transport;
+ 
+     this.membershipListeners = new ConcurrentHashMap();
+     this.distributedSystemId = system.getConfig().getDistributedSystemId();
+     {
+       long statId = OSProcess.getId();
+       /* deadcoded since we don't know the channel id yet.
+         if (statId == 0 || statId == -1) {
+         statId = getChannelId();
+         }
+       */
+       this.stats = new DistributionStats(system, statId);
+       DistributionStats.enableClockStats = system.getConfig().getEnableTimeStatistics();
+     }
+ 
+     this.exceptionInThreads = false;
+     
+     // Start the processing threads
+     final LoggingThreadGroup group =
+       LoggingThreadGroup.createThreadGroup("DistributionManager Threads", logger);
+     this.threadGroup = group;
+     
+     boolean finishedConstructor = false;
+     try {
+ 
+     if (MULTI_SERIAL_EXECUTORS) {
+       if (logger.isInfoEnabled(LogMarker.DM)) {
+         logger.info(LogMarker.DM,
+             "Serial Queue info :" + 
+             " THROTTLE_PERCENT: " + THROTTLE_PERCENT +
+             " SERIAL_QUEUE_BYTE_LIMIT :" + SERIAL_QUEUE_BYTE_LIMIT +   
+             " SERIAL_QUEUE_THROTTLE :" + SERIAL_QUEUE_THROTTLE + 
+             " TOTAL_SERIAL_QUEUE_BYTE_LIMIT :" + TOTAL_SERIAL_QUEUE_BYTE_LIMIT +  
+             " TOTAL_SERIAL_QUEUE_THROTTLE :" + TOTAL_SERIAL_QUEUE_THROTTLE + 
+             " SERIAL_QUEUE_SIZE_LIMIT :" + SERIAL_QUEUE_SIZE_LIMIT +
+             " SERIAL_QUEUE_SIZE_THROTTLE :" + SERIAL_QUEUE_SIZE_THROTTLE
+         ); 
+       }
+       //  when TCP/IP is disabled we can't throttle the serial queue or we run the risk of 
+       // distributed deadlock when we block the UDP reader thread
+       boolean throttlingDisabled = system.getConfig().getDisableTcp();
+       this.serialQueuedExecutorPool = new SerialQueuedExecutorPool(this.threadGroup, this.stats, throttlingDisabled);
+     }
+       
+     {
+       BlockingQueue poolQueue;
+       if (SERIAL_QUEUE_BYTE_LIMIT == 0) {
+         poolQueue = new OverflowQueueWithDMStats(this.stats.getSerialQueueHelper());
+       } else {
+         this.serialQueue = new ThrottlingMemLinkedQueueWithDMStats(TOTAL_SERIAL_QUEUE_BYTE_LIMIT, 
+             TOTAL_SERIAL_QUEUE_THROTTLE, SERIAL_QUEUE_SIZE_LIMIT, SERIAL_QUEUE_SIZE_THROTTLE, 
+             this.stats.getSerialQueueHelper());
+         poolQueue = this.serialQueue;
+      } 
+       ThreadFactory tf = new ThreadFactory() {
+         public Thread newThread(final Runnable command) {
+           DistributionManager.this.stats.incSerialThreadStarts();
+           final Runnable r = new Runnable() {
+             public void run() {
+               DistributionManager.this.stats.incNumSerialThreads(1);
+               try {
+               ConnectionTable.threadWantsSharedResources();
+               Connection.makeReaderThread();
+               runUntilShutdown(command);
+               // command.run();
+               } finally {
+                 ConnectionTable.releaseThreadsSockets();
+                 DistributionManager.this.stats.incNumSerialThreads(-1);
+               }
+             }
+           };
+           Thread thread = new Thread(group, r, LocalizedStrings.DistributionManager_SERIAL_MESSAGE_PROCESSOR.toLocalizedString());
+           thread.setDaemon(true);
+           return thread;
+         }
+       };
+       SerialQueuedExecutorWithDMStats executor = new SerialQueuedExecutorWithDMStats(poolQueue, 
+           this.stats.getSerialProcessorHelper(), tf);
+       this.serialThread = executor;
+     }
+     {
+       BlockingQueue q = new LinkedBlockingQueue();
+       ThreadFactory tf = new ThreadFactory() {
+           public Thread newThread(final Runnable command) {
+             DistributionManager.this.stats.incViewThreadStarts();
+             final Runnable r = new Runnable() {
+                 public void run() {
+                   DistributionManager.this.stats.incNumViewThreads(1);
+                   try {
+                     ConnectionTable.threadWantsSharedResources();
+                     Connection.makeReaderThread();
+                     runUntilShutdown(command);
+                   } finally {
+                     ConnectionTable.releaseThreadsSockets();
+                     DistributionManager.this.stats.incNumViewThreads(-1);
+                   }
+                 }
+               };
+             Thread thread = new Thread(group, r, LocalizedStrings.DistributionManager_VIEW_MESSAGE_PROCESSOR.toLocalizedString());
+             thread.setDaemon(true);
+             return thread;
+           }
+         };
+       this.viewThread = new SerialQueuedExecutorWithDMStats(q, 
+           this.stats.getViewProcessorHelper(), tf);
+     }
+ 
+     {
+       BlockingQueue poolQueue;
+       if (INCOMING_QUEUE_LIMIT == 0) {
+         poolQueue = new OverflowQueueWithDMStats(this.stats.getOverflowQueueHelper());
+       } else {
+         poolQueue = new OverflowQueueWithDMStats(INCOMING_QUEUE_LIMIT, this.stats.getOverflowQueueHelper());
+       }
+       ThreadFactory tf = new ThreadFactory() {
+           private int next = 0;
+ 
+           public Thread newThread(final Runnable command) {
+             DistributionManager.this.stats.incProcessingThreadStarts();
+             final Runnable r = new Runnable() {
+                 public void run() {
+                   DistributionManager.this.stats.incNumProcessingThreads(1);
+                   try {
+                   ConnectionTable.threadWantsSharedResources();
+                   Connection.makeReaderThread();
+                   runUntilShutdown(command);
+                   } finally {
+                     ConnectionTable.releaseThreadsSockets();
+                     DistributionManager.this.stats.incNumProcessingThreads(-1);
+                   }
+                 }
+               };
+             Thread thread = new Thread(group, r, 
+                LocalizedStrings.DistributionManager_POOLED_MESSAGE_PROCESSOR.toLocalizedString() + (next++));
+             thread.setDaemon(true);
+             return thread;
+           }
+         };
+       ThreadPoolExecutor pool =
+         new PooledExecutorWithDMStats(poolQueue, MAX_THREADS, this.stats.getNormalPoolHelper(), tf);
+       this.threadPool = pool;
+     }
+ 
+ 
+     {
+       BlockingQueue poolQueue;
+       if (INCOMING_QUEUE_LIMIT == 0) {
+         poolQueue = new OverflowQueueWithDMStats(this.stats.getHighPriorityQueueHelper());
+       } else {
+         poolQueue = new OverflowQueueWithDMStats(INCOMING_QUEUE_LIMIT, this.stats.getHighPriorityQueueHelper());
+       }
+       ThreadFactory tf = new ThreadFactory() {
+           private int next = 0;
+ 
+           public Thread newThread(final Runnable command) {
+             DistributionManager.this.stats.incHighPriorityThreadStarts();
+             final Runnable r = new Runnable() {
+                 public void run() {
+                   DistributionManager.this.stats.incHighPriorityThreads(1);
+                   try {
+                     ConnectionTable.threadWantsSharedResources();
+                     Connection.makeReaderThread();
+                     runUntilShutdown(command);
+                   } finally {
+                     ConnectionTable.releaseThreadsSockets();
+                     DistributionManager.this.stats.incHighPriorityThreads(-1);
+                   }
+                 }
+               };
+             Thread thread = new Thread(group, r, 
+                 LocalizedStrings.DistributionManager_POOLED_HIGH_PRIORITY_MESSAGE_PROCESSOR.toLocalizedString() + (next++));
+             thread.setDaemon(true);
+             return thread;
+           }
+         };
+       this.highPriorityPool = new PooledExecutorWithDMStats(poolQueue, MAX_THREADS, this.stats.getHighPriorityPoolHelper(), tf);
+     }
+ 
+ 
+     {
+       ThreadFactory tf = new ThreadFactory() {
+           private int next = 0;
+ 
+           public Thread newThread(final Runnable command) {
+             DistributionManager.this.stats.incWaitingThreadStarts();
+             final Runnable r = new Runnable() {
+                 public void run() {
+                   DistributionManager.this.stats.incWaitingThreads(1);
+                   try {
+                   ConnectionTable.threadWantsSharedResources();
+                   Connection.makeReaderThread();
+                   runUntilShutdown(command);
+                   } finally {
+                    ConnectionTable.releaseThreadsSockets();
+                    DistributionManager.this.stats.incWaitingThreads(-1);
+                   }
+                 }
+               };
+             Thread thread = new Thread(group, r, 
+                                        LocalizedStrings.DistributionManager_POOLED_WAITING_MESSAGE_PROCESSOR.toLocalizedString() + (next++));
+             thread.setDaemon(true);
+             return thread;
+           }
+         };
+       BlockingQueue poolQueue;
+       if (MAX_WAITING_THREADS == Integer.MAX_VALUE) {
+         // no need for a queue since we have infinite threads
+         poolQueue = new SynchronousQueue();
+       } else {
+         poolQueue = new OverflowQueueWithDMStats(this.stats.getWaitingQueueHelper());
+       }
+       this.waitingPool = new PooledExecutorWithDMStats(poolQueue,
+                                                        MAX_WAITING_THREADS,
+                                                        this.stats.getWaitingPoolHelper(),
+                                                        tf);
+     }
+     
+     {
+       ThreadFactory tf = new ThreadFactory() {
+           private int next = 0;
+ 
+           public Thread newThread(final Runnable command) {
+             DistributionManager.this.stats.incWaitingThreadStarts();//will it be ok?
+             final Runnable r = new Runnable() {
+                 public void run() {
+                   DistributionManager.this.stats.incWaitingThreads(1);//will it be ok
+                   try {
+                   ConnectionTable.threadWantsSharedResources();
+                   Connection.makeReaderThread();
+                   runUntilShutdown(command);
+                   } finally {
+                    ConnectionTable.releaseThreadsSockets();
+                    DistributionManager.this.stats.incWaitingThreads(-1);
+                   }
+                 }
+               };
+             Thread thread = new Thread(group, r, 
+                                        LocalizedStrings.DistributionManager_PR_META_DATA_CLEANUP_MESSAGE_PROCESSOR.toLocalizedString() + (next++));
+             thread.setDaemon(true);
+             return thread;
+           }
+         };
+       BlockingQueue poolQueue;
+       poolQueue = new OverflowQueueWithDMStats(this.stats.getWaitingQueueHelper());
+       this.prMetaDataCleanupThreadPool = new PooledExecutorWithDMStats(poolQueue,
+                                                         MAX_PR_META_DATA_CLEANUP_THREADS,
+                                                        this.stats.getWaitingPoolHelper(),
+                                                        tf);
+     }
+ 
+     {
+       BlockingQueue poolQueue;
+       if (INCOMING_QUEUE_LIMIT == 0) {
+         poolQueue = new OverflowQueueWithDMStats(this.stats.getPartitionedRegionQueueHelper());
+       } else {
+         poolQueue = new OverflowQueueWithDMStats(INCOMING_QUEUE_LIMIT, this.stats.getPartitionedRegionQueueHelper());
+       }
+       ThreadFactory tf = new ThreadFactory() {
+         private int next = 0;
+ 
+         public Thread newThread(final Runnable command) {
+           DistributionManager.this.stats.incPartitionedRegionThreadStarts();
+           final Runnable r = new Runnable() {
+               public void run() {
+                 stats.incPartitionedRegionThreads(1);
+                 try {
+                   ConnectionTable.threadWantsSharedResources();
+                   Connection.makeReaderThread();
+                   runUntilShutdown(command);
+                 } finally {
+                   ConnectionTable.releaseThreadsSockets();
+                   stats.incPartitionedRegionThreads(-1);
+                 }
+               }
+             };
+           Thread thread = new Thread(group, r, 
+                                      "PartitionedRegion Message Processor" + (next++));
+           thread.setDaemon(true);
+           return thread;
+         }
+       };
+       if (MAX_PR_THREADS > 1) {
+         this.partitionedRegionPool = new PooledExecutorWithDMStats(poolQueue, 
+             MAX_PR_THREADS, this.stats.getPartitionedRegionPoolHelper(), tf);
+       } else {
+         SerialQueuedExecutorWithDMStats executor = new SerialQueuedExecutorWithDMStats(poolQueue, 
+             this.stats.getPartitionedRegionPoolHelper(), tf);
+         this.partitionedRegionThread = executor;
+       }
+       
+     }
+ 
+     {
+       BlockingQueue poolQueue;
+       if (INCOMING_QUEUE_LIMIT == 0) {
+         poolQueue = new OverflowQueueWithDMStats(this.stats.getFunctionExecutionQueueHelper());
+       } else {
+         poolQueue = new OverflowQueueWithDMStats(INCOMING_QUEUE_LIMIT, this.stats.getFunctionExecutionQueueHelper());
+       }
+       ThreadFactory tf = new ThreadFactory() {
+         private int next = 0;
+ 
+         public Thread newThread(final Runnable command) {
+           DistributionManager.this.stats.incFunctionExecutionThreadStarts();
+           final Runnable r = new Runnable() {
+               public void run() {
+                 stats.incFunctionExecutionThreads(1);
+                 isFunctionExecutionThread.set(Boolean.TRUE);
+                 try {
+                   ConnectionTable.threadWantsSharedResources();
+                   Connection.makeReaderThread();
+                   runUntilShutdown(command);
+                 } finally {
+                   ConnectionTable.releaseThreadsSockets();
+                   stats.incFunctionExecutionThreads(-1);
+                 }
+               }
+             };
+           Thread thread = new Thread(group, r, 
+                                      "Function Execution Processor" + (next++));
+           thread.setDaemon(true);
+           return thread;
+         }
+       };
+       
+       if(MAX_FE_THREADS > 1){
+         this.functionExecutionPool = new FunctionExecutionPooledExecutor(poolQueue, 
+             MAX_FE_THREADS, this.stats.getFunctionExecutionPoolHelper(), tf,true /*for fn exec*/);
+       } else {
+         SerialQueuedExecutorWithDMStats executor = new SerialQueuedExecutorWithDMStats(poolQueue, 
+             this.stats.getFunctionExecutionPoolHelper(), tf);
+         this.functionExecutionThread = executor;
+       }
+     
+     }
+     
+     if (!SYNC_EVENTS) {
+       this.memberEventThread = new Thread(group, new MemberEventInvoker(), 
+           "DM-MemberEventInvoker");
+       this.memberEventThread.setDaemon(true);
+     }
+ 
+     StringBuffer sb = new StringBuffer(" (took ");
+ 
+    long start = System.currentTimeMillis();
+     
+     // Create direct channel first
+ //    DirectChannel dc = new DirectChannel(new MyListener(this), system.getConfig(), logger, null);
+ //    setDirectChannelPort(dc.getPort()); // store in a thread local
+ 
+     // connect to JGroups
+     start = System.currentTimeMillis();
+     
+     MyListener l = new MyListener(this);
+     membershipManager = MemberFactory.newMembershipManager(l, system.getConfig(), transport, stats);
+ 
+     sb.append(System.currentTimeMillis() - start);
+ 
+     this.myid = membershipManager.getLocalMember();
+ 
+ //    dc.patchUpAddress(this.myid);
+ //    id.setDirectChannelPort(dc.getPort());
+ 
+     // create the distribution channel
+     this.channel = new DistributionChannel(membershipManager);
+ 
+     membershipManager.postConnect();
+     
+     //Assert.assertTrue(this.getChannelMap().size() >= 1);
+     //       System.out.println("Channel Map:");
+     //       for (Iterator iter = this.getChannelMap().entrySet().iterator();
+     //            iter.hasNext(); ) {
+     //         Map.Entry entry = (Map.Entry) iter.next();
+     //         Object key = entry.getKey();
+     //         System.out.println("  " + key + " a " +
+     //                            key.getClass().getName() + " -> " +
+     //                            entry.getValue());
+     //       }
+ 
+     sb.append(" ms)");
+ 
+     logger.info(LocalizedMessage.create(LocalizedStrings.DistributionManager_STARTING_DISTRIBUTIONMANAGER_0_1,
+         new Object[] { this.myid, (logger.isInfoEnabled(LogMarker.DM) ? sb.toString() : "")}));
+ 
+     this.description = NAME + " on " + this.myid + " started at "
+       + (new Date(System.currentTimeMillis())).toString();
+ 
+     finishedConstructor = true;
+     } finally {
+       if (!finishedConstructor) {
+         askThreadsToStop(); // fix for bug 42039
+       }
+     }
+   }
+ 
+   /**
+    * Creates a new distribution manager
+    *
+    * @param system
+    *        The distributed system to which this distribution manager
+    *        will send messages.
+    */
+   private DistributionManager(
+     InternalDistributedSystem system,
+     RemoteTransportConfig transport)
+   {
+     this(transport, system);
+ 
+     boolean finishedConstructor = false;
+     try {
+ 
+     isStartupThread.set(Boolean.TRUE);
+     
+     startThreads();
+     
+     // Since we need a StartupResponseMessage to make sure licenses
+     // are compatible the following has been deadcoded.
+ //     // For the time being, invoke processStartupResponse()
+ //     String rejectionMessage = null;
+ //     if (GemFireVersion.getGemFireVersion().
+ //         equals(state.getGemFireVersion())) {
+ //       rejectionMessage = "Rejected new system node " +
+ //         this.getDistributionManagerId() + " with version \"" +
+ //         GemFireVersion.getGemFireVersion() +
+ //         "\" because the distributed system's version is \"" +
+ //         state.getGemFireVersion() + "\".";
+ //     }
+ //     this.processStartupResponse(state.getCacheTime(),
+ //                         rejectionMessage);
+ 
+     // Allow events to start being processed.
+     membershipManager.startEventProcessing();
+     for (;;) {
+       this.getCancelCriterion().checkCancelInProgress(null);
+       boolean interrupted = Thread.interrupted();
+       try {
+         membershipManager.waitForEventProcessing();
+         break;
+       }
+       catch (InterruptedException e) {
+         interrupted = true;
+       }
+       finally {
+         if (interrupted) {
+           Thread.currentThread().interrupt();
+         }
+       }
+     }
+     
+     synchronized (DistributionManager.class) {
+       openDMs++;
+     }
+     finishedConstructor = true;
+     } finally {
+       if (!finishedConstructor) {
+         askThreadsToStop(); // fix for bug 42039
+       }
+     }
+   }
+ 
+   ////////////////////  Instance Methods  /////////////////////
+ 
+   /**
+    * Returns true if the two members are on the same equivalent host based 
+    * on overlapping IP addresses collected for all NICs during exchange of
+    * startup messages.
+    * 
+    * @param member1 First member
+    * @param member2 Second member
+    */
+   public boolean areOnEquivalentHost(InternalDistributedMember member1,
+                                      InternalDistributedMember member2) {
+     Set<InetAddress> equivalents1 = getEquivalents(member1.getInetAddress());
+     return equivalents1.contains(member2.getInetAddress());
+   }
+   
+   /**
+    * Set the host equivalencies for a given host.  This overrides any
+    * previous information in the tables.
+    * @param equivs list of InetAddress's that all point at same host
+    */
+   public void setEquivalentHosts(Set<InetAddress> equivs) {
+     Iterator<InetAddress> it = equivs.iterator();
+     synchronized (equivalentHosts) {
+      while (it.hasNext()) {
+        equivalentHosts.put(it.next(), Collections.unmodifiableSet(equivs));
+      }
+     }
+   }
+   
+   public HashMap<InetAddress, Set<InetAddress>> getEquivalentHostsSnapshot() {
+     synchronized (this.equivalentHosts) {
+       return new HashMap<InetAddress, Set<InetAddress>>(this.equivalentHosts);
+     }
+   }
+   
+   /**
+    * Return all of the InetAddress's that are equivalent to the given one (same
+    * host)
+    * @param in host to match up
+    * @return all the addresses thus equivalent
+    */
+   public Set<InetAddress> getEquivalents(InetAddress in) {
+     Set<InetAddress> result;
+     synchronized (equivalentHosts) {
+       result = equivalentHosts.get(in);
+     }
+     //DS 11/25/08 - It appears that when using VPN, the distributed member
+     //id is the vpn address, but that doesn't show up in the equivalents.
+     if(result == null) {
+       result = Collections.singleton(in);
+     }
+     return result;
+   }
+   
+   public void setRedundancyZone(InternalDistributedMember member, String redundancyZone) {
+     if(redundancyZone != null && !redundancyZone.equals("")) {
+       this.redundancyZones.put(member, redundancyZone);
+     }
+     if (member != getDistributionManagerId()) {
+       String relationship = areInSameZone(getDistributionManagerId(), member) ? ""
+           : "not ";
+       Object[] logArgs = new Object[] { member, relationship };
+       logger.info(LocalizedMessage.create(LocalizedStrings.DistributionManager_DISTRIBUTIONMANAGER_MEMBER_0_IS_1_EQUIVALENT, logArgs));
+     }
+   }
+ 
+   /**
+    * Set the flag indicating that we should enforce unique zones.
+    * If we are already enforcing unique zones, keep it that way.
+    */
+   public void setEnforceUniqueZone(boolean enforceUniqueZone) {
+     this.enforceUniqueZone |= enforceUniqueZone;
+   }
+   
+   public boolean enforceUniqueZone() {
+     return enforceUniqueZone;
+   }
+   
+   public String getRedundancyZone(InternalDistributedMember member) {
+     return redundancyZones.get(member);
+   }
+   
+   /**
+    * Asserts that distributionManagerType is LOCAL, GEMFIRE, or
+    * ADMIN_ONLY.  Also asserts that the distributionManagerId
+    * (jgroups DistributedMember) has a VmKind that matches.
+    */
+   private void assertDistributionManagerType() {
+     // Assert that dmType is one of the three DM types...
+     int theDmType = getDMType();
+     switch (theDmType) {
+     case NORMAL_DM_TYPE:
+     case LONER_DM_TYPE:
+     case ADMIN_ONLY_DM_TYPE:
+     case LOCATOR_DM_TYPE:
+       break;
+     default:
+       Assert.assertTrue(false, "unknown distribution manager type");
+     }
+     
+     // Assert InternalDistributedMember VmKind matches this DistributionManagerType...
+     final InternalDistributedMember theId = getDistributionManagerId();
+     final int vmKind = theId.getVmKind();
+     if (theDmType != vmKind) {
+       Assert.assertTrue(false, 
+           "InternalDistributedMember has a vmKind of " + vmKind + 
+           " instead of " + theDmType);
+     }
+   }
+ 
+   public int getDMType() {
+     return this.dmType;
+   }
+   
+   public List<InternalDistributedMember> getViewMembers() {
+     NetView result = null;
+     DistributionChannel ch = this.channel;
+     if (ch != null) {
+       MembershipManager mgr = ch.getMembershipManager();
+       if (mgr != null) {
+         result = mgr.getView();
+         }
+     }
+     if (result == null) {
+       result = new NetView();
+     }
+     return result.getMembers();
+   }
+   /* implementation of DM.getOldestMember */
+   public DistributedMember getOldestMember(Collection c) throws NoSuchElementException {
+     List<InternalDistributedMember> view = getViewMembers();
+     for (int i=0; i<view.size(); i++) {
+       Object viewMbr = view.get(i);
+       Iterator it = c.iterator();
+       while (it.hasNext()) {
+         Object nextMbr = it.next();
+         if (viewMbr.equals(nextMbr)) {
+           return (DistributedMember)nextMbr;
+         }
+       }
+     }
+     throw new NoSuchElementException(LocalizedStrings.DistributionManager_NONE_OF_THE_GIVEN_MANAGERS_IS_IN_THE_CURRENT_MEMBERSHIP_VIEW.toLocalizedString());
+   }
+   
+   private boolean testMulticast() {
+     return this.membershipManager.testMulticast();
+   }
+ 
+   /**
+    * Print a membership view (list of {@link InternalDistributedMember}s)
+    * 
+    * @param v the list
+    * @return String
+    */
+   static public String printView(NetView v) {
+     if (v == null)
+       return "null";
+     
+     return v.toString();
+   }
+ 
+   /**
+    * Need to do this outside the constructor so that the child
+    * constructor can finish.
+    */
+   protected void startThreads() {
+     this.system.setDM(this); // fix for bug 33362
+     if (this.memberEventThread != null)
+       this.memberEventThread.start();
+     try {
+       
+       // And the distinguished guests today are...
+       NetView v = membershipManager.getView();
+       logger.info(LocalizedMessage.create(LocalizedStrings.DistributionManager_INITIAL_MEMBERSHIPMANAGER_VIEW___0, printView(v)));
+       
+       // Add them all to our view
+       Iterator<InternalDistributedMember> it = v.getMembers().iterator();
+       while (it.hasNext()) {
+         addNewMember(it.next());
+       }
+       
+       // Figure out who the elder is...
+       selectElder(); // ShutdownException could be thrown here
+     } catch (Exception ex) {
+       throw new InternalGemFireException(LocalizedStrings.DistributionManager_COULD_NOT_PROCESS_INITIAL_VIEW.toLocalizedString(), ex);
+     }
+     try {
+       getWaitingThreadPool().execute(new Runnable() {
+           public void run() {
+             // call in background since it might need to send a reply
+             // and we are not ready to send messages until startup is finished
+             isStartupThread.set(Boolean.TRUE);
+             readyForMessages();
+           }
+         });
+     }
+     catch (VirtualMachineError err) {
+       SystemFailure.initiateFailure(err);
+       // If this ever returns, rethrow the error.  We're poisoned
+       // now, so don't let this thread continue.
+       throw err;
+     }
+     catch (Throwable t) {
+       // Whenever you catch Error or Throwable, you must also
+       // catch VirtualMachineError (see above).  However, there is
+       // _still_ a possibility that you are dealing with a cascading
+       // error condition, so you also need to check to see if the JVM
+       // is still usable:
+       SystemFailure.checkFailure();
+       logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributionManager_UNCAUGHT_EXCEPTION_CALLING_READYFORMESSAGES), t);
+     }
+   }
+ 
+   protected void readyForMessages() {
+     synchronized (this) {
+       this.readyForMessages = true;
+       this.notifyAll();
+     }
+     membershipManager.startEventProcessing();
+   }
+   
+   protected void waitUntilReadyForMessages() {
+     if (readyForMessages)
+       return;
+ //    membershipManager.waitForEventProcessing();
+     synchronized (this) {
+       for (;;) {
+         if (readyForMessages)
+           break;
+         stopper.checkCancelInProgress(null);
+         boolean interrupted = Thread.interrupted();
+         try {
+           this.wait();
+         }
+         catch (InterruptedException e) {
+           interrupted = true;
+           stopper.checkCancelInProgress(e);
+         }
+         finally {
+           if (interrupted) {
+             Thread.currentThread().interrupt();
+           }
+         }
+       } // for
+     } // synchronized
+   }
+ 
+   /**
+    * Call when the DM is ready to send messages.
+    */
+   private void readyToSendMsgs() {
+     synchronized (this.readyToSendMsgsLock) {
+       this.readyToSendMsgs = true;
+       this.readyToSendMsgsLock.notifyAll();
+     }
+   }
+   /**
+    * Return when DM is ready to send out messages.
+    * @param msg the messsage that is currently being sent
+    */
+   protected void waitUntilReadyToSendMsgs(DistributionMessage msg) {
+     if (this.readyToSendMsgs) {
+       return;
+     }
+     // another process may have been started in the same view, so we need
+     // to be responsive to startup messages and be able to send responses
+     if (msg instanceof StartupMessage || msg instanceof StartupResponseMessage
+         || msg instanceof AdminMessageType) {
+       return;
+     }
+     if (isStartupThread.get() != null) {
+       // let the startup thread send messages
+       // the only case I know of that does this is if we happen to log a
+       // message during startup and an alert listener has registered.
+       return;
+     }
+ //    membershipManager.waitForEventProcessing();
+     synchronized (this.readyToSendMsgsLock) {
+       for (;;) {
+         if (this.readyToSendMsgs)
+           break;
+         stopper.checkCancelInProgress(null);
+         boolean interrupted = Thread.interrupted();
+         try {
+           this.readyToSendMsgsLock.wait();
+         }
+         catch (InterruptedException e) {
+           interrupted = true;
+           stopper.checkCancelInProgress(e);
+         }
+         finally {
+           if (interrupted) {
+             Thread.currentThread().interrupt();
+           }
+         }
+       } // for
+     } // synchronized
+   }
+   
+   // DM method
+   @Override
+   public void forceUDPMessagingForCurrentThread() {
+     membershipManager.forceUDPMessagingForCurrentThread();
+   }
+   
+   // DM method
+   @Override
+   public void releaseUDPMessagingForCurrentThread() {
+     membershipManager.releaseUDPMessagingForCurrentThread();
+   }
+ 
+   /**
+    * Did an exception occur in one of the threads launched by this
+    * distribution manager?
+    */
+   public boolean exceptionInThreads() {
+     return this.exceptionInThreads || this.threadGroup.getUncaughtExceptionsCount() > 0;
+   }
+ 
+   /**
+    * Clears the boolean that determines whether or not an exception
+    * occurred in one of the worker threads.  This method should be
+    * used for testing purposes only!
+    */
+   void clearExceptionInThreads() {
+     this.exceptionInThreads = false;
+     this.threadGroup.clearUncaughtExceptionsCount();
+   }
+ 
+   /**
+    * Returns the current "cache time" in milliseconds since the epoch.
+    * The "cache time" takes into account skew among the local clocks
+    * on the various machines involved in the cache.
+    */
+   public long cacheTimeMillis() {
+     return this.system.getClock().cacheTimeMillis();
+   }
+ 
+ 
+ 
+   /**
+    * Returns the id of this distribution manager.
+    */
+   public InternalDistributedMember getDistributionManagerId() {
+     return this.myid;
+   }
+ 
+   /**
+    * Returns an unmodifiable set containing the identities of all of
+    * the known (non-admin-only) distribution managers.
+    */
+   public Set getDistributionManagerIds() {
+     // access to members synchronized under membersLock in order to 
+     // ensure serialization
+     synchronized (this.membersLock) {
+       return this.members.keySet();
+     }
+   }
+   
+   /**
+    * Adds the entry in {@link #hostedLocatorsAll} for a member with one or more
+    * hosted locators. The value is a collection of host[port] strings. If a 
+    * bind-address was used for a locator then the form is bind-addr[port].
+    *
+    * @since 6.6.3
+    */
+   public void addHostedLocators(InternalDistributedMember member, Collection<String> locators, boolean isSharedConfigurationEnabled) {
+     synchronized (this.membersLock) {
+       if (locators == null || locators.isEmpty()) {
+         throw new IllegalArgumentException("Cannot use empty collection of locators");
+       }
+       if (this.hostedLocatorsAll.isEmpty()) {
+         this.hostedLocatorsAll = new HashMap<InternalDistributedMember, Collection<String>>();
+       }
+       
+       if (!this.isSharedConfigEnabledForDS) {
+         this.isSharedConfigEnabledForDS = isSharedConfigurationEnabled;
+       }
+       
+       Map<InternalDistributedMember, Collection<String>> tmp = 
+           new HashMap<InternalDistributedMember, Collection<String>>(this.hostedLocatorsAll);
+       tmp.remove(member);
+       tmp.put(member, locators);
+       tmp = Collections.unmodifiableMap(tmp);
+       this.hostedLocatorsAll = tmp;
+       
+       if (isSharedConfigurationEnabled) {
+         if (locators == null || locators.isEmpty()) {
+           throw new IllegalArgumentException("Cannot use empty collection of locators");
+         }
+         if (this.hostedLocatorsWithSharedConfiguration.isEmpty()) {
+           this.hostedLocatorsWithSharedConfiguration = new HashMap<InternalDistributedMember, Collection<String>>();
+         }
+         tmp = new HashMap<InternalDistributedMember, Collection<String>>(this.hostedLocatorsWithSharedConfiguration);
+         tmp.remove(member);
+         tmp.put(member, locators);
+         tmp = Collections.unmodifiableMap(tmp);
+         this.hostedLocatorsWithSharedConfiguration = tmp;
+       }
+ 
+     }
+   }
+   
+   
+   private void removeHostedLocators(InternalDistributedMember member) {
+     synchronized (this.membersLock) {
+       if (this.hostedLocatorsAll.containsKey(member)) {
+         Map<InternalDistributedMember, Collection<String>> tmp = 
+             new HashMap<InternalDistributedMember, Collection<String>>(this.hostedLocatorsAll);
+         tmp.remove(member);
+         if (tmp.isEmpty()) {
+           tmp = Collections.emptyMap();
+         } else {
+           tmp = Collections.unmodifiableMap(tmp);
+         }
+         this.hostedLocatorsAll = tmp;
+       }
+       if (this.hostedLocatorsWithSharedConfiguration.containsKey(member)) {
+         Map<InternalDistributedMember, Collection<String>> tmp = 
+             new HashMap<InternalDistributedMember, Collection<String>>(this.hostedLocatorsWithSharedConfiguration);
+         tmp.remove(member);
+         if (tmp.isEmpty()) {
+           tmp = Collections.emptyMap();
+         } else {
+           tmp = Collections.unmodifiableMap(tmp);
+         }
+         this.hostedLocatorsWithSharedConfiguration = tmp;
+       }
+     }
+   }
+   
+   
+   
+   /**
+    * Gets the value in {@link #hostedLocatorsAll} for a member with one or more
+    * hosted locators. The value is a collection of host[port] strings. If a 
+    * bind-address was used for a locator then the form is bind-addr[port].
+    * 
+    * @since 6.6.3
+    */
+   public Collection<String> getHostedLocators(InternalDistributedMember member) {
+     synchronized (this.membersLock) {
+       return this.hostedLocatorsAll.get(member);
+     }
+   }
+   
+   /**
+    * Returns a copy of the map of all members hosting locators. The key is the 
+    * member, and the value is a collection of host[port] strings. If a 
+    * bind-address was used for a locator then the form is bind-addr[port].
+    * 
+    * @since 6.6.3
+    */
+   public Map<InternalDistributedMember, Collection<String>> getAllHostedLocators() {
+     synchronized (this.membersLock) {
+       return this.hostedLocatorsAll;
+     }
+   }
+   /**
+    * Returns a copy of the map of all members hosting locators with shared configuration. The key is the 
+    * member, and the value is a collection of host[port] strings. If a 
+    * bind-address was used for a locator then the form is bind-addr[port].
+    * 
+    * @since 8.0
+    */
+   @Override
+   public Map<InternalDistributedMember, Collection<String>> getAllHostedLocatorsWithSharedConfiguration() {
+     synchronized (this.membersLock) {
+       return this.hostedLocatorsWithSharedConfiguration;
+     }
+   }
+   /**
+    * Returns an unmodifiable set containing the identities of all of
+    * the known (including admin) distribution managers.
+    */
+   public Set getDistributionManagerIdsIncludingAdmin() {
+     // access to members synchronized under membersLock in order to 
+     // ensure serialization
+     synchronized (this.membersLock) {
+       return this.membersAndAdmin;
+     }
+   }
+   
+ 
+   /**
+    * Returns the low-level distribution channel for this distribution
+    * manager. (brought over from ConsoleDistributionManager)
+    *
+    * @since 4.0
+    */
+   public DistributionChannel getDistributionChannel() {
+     return this.channel;
+   }
+ 
+ 
+   /**
+    * Returns a private-memory list containing the identities of all
+    * the other known distribution managers not including me.
+    */
+   public Set getOtherDistributionManagerIds() {
+     // We return a modified copy of the list, so
+     // collect the old list and copy under the lock.
+     Set result = new HashSet(getDistributionManagerIds());
+ 
+     InternalDistributedMember me = getDistributionManagerId();
+     result.remove(me);
+ 
+     // It's okay for my own id to not be in the list of all ids yet.
+     return result;
+   }
+   @Override
+   public Set getOtherNormalDistributionManagerIds() {
+     // We return a modified copy of the list, so
+     // collect the old list and copy under the lock.
+     Set result = new HashSet(getNormalDistributionManagerIds());
+ 
+     InternalDistributedMember me = getDistributionManagerId();
+     result.remove(me);
+ 
+     // It's okay for my own id to not be in the list of all ids yet.
+     return result;
+   }
+ 
+   public InternalDistributedMember getCanonicalId(DistributedMember id) {
+     // the members set is copy-on-write, so it is safe to iterate over it
+     InternalDistributedMember result = this.members.get(id);
+     if (result == null) {
+       return (InternalDistributedMember)id;
+     }
+     return result;
+   }
+ 
+   /**
+    * Add a membership listener and return other DistribtionManagerIds
+    * as an atomic operation
+    */
+   public Set addMembershipListenerAndGetDistributionManagerIds(MembershipListener l) {
+     // switched sync order to fix bug 30360
+     synchronized (this.membersLock) {
+       // Don't let the members come and go while we are adding this
+       // listener.  This ensures that the listener (probably a
+       // ReplyProcessor) gets a consistent view of the members.
+       addMembershipListener(l);
+       // Note it is ok to return the members set
+       // because we will never modify the returned set.
+       return members.keySet();
+     }
+   }
+ 
+   public void addNewMember(InternalDistributedMember member) {
+     // This is the place to cleanup the zombieMembers
+     int vmType = member.getVmKind();
+     switch (vmType) {
+       case ADMIN_ONLY_DM_TYPE:
+         handleConsoleStartup(member);
+         break;
+       case LOCATOR_DM_TYPE:
+       case NORMAL_DM_TYPE:
+         handleManagerStartup(member);
+         break;        
+       default:
+         throw new InternalGemFireError(LocalizedStrings.DistributionManager_UNKNOWN_MEMBER_TYPE_0.toLocalizedString(Integer.valueOf(vmType)));
+     }
+   }
+ 
+    /**
+    * Returns the identity of this <code>DistributionManager</code>
+    */
+   public InternalDistributedMember getId() {
+     return this.myid;
+   }
+ 
+   /**
+    * Returns the id of the underlying distribution channel used for
+    * communication.
+    *
+    * @since 3.0
+    */
+   public long getChannelId() {
+     return this.channel.getId();
+   }
+ 
+   /**
+    * Adds a message to the outgoing queue.  Note that
+    * <code>message</code> should not be modified after it has been
+    * added to the queue.  After <code>message</code> is distributed,
+    * it will be recycled.
+    *
+    * @return list of recipients who did not receive the message
+    * @throws NotSerializableException if the content is not serializable
+    */
+   public Set putOutgoingUserData(final DistributionMessage message) 
+       throws NotSerializableException {
+     return sendMessage(message); 
+   }
+ 
+   /**
+    * Send outgoing data; message is guaranteed to be serialized.
+    * @return list of recipients who did not receive the message
+    * @throws InternalGemFireException if message is not serializable
+    */
+   public Set putOutgoing(final DistributionMessage msg) {
+     try {
+       DistributionMessageObserver observer = DistributionMessageObserver.getInstance();
+       if(observer != null) {
+         observer.beforeSendMessage(this, msg);
+       }
+       return sendMessage(msg);
+     }
+     catch (NotSerializableException e) {
+       throw new InternalGemFireException(e);
+     }
+     catch (ToDataException e) {
+       // exception from user code
+       throw e;
+     }
+   }
+ 
+   @Override
+   public String toString() {
+     return this.description;
+   }
+ 
+   /**
+    * @see #closeInProgress
+    */
+   private final Object shutdownMutex = new Object();
+   
+   /**
+    * Informs other members that this dm is shutting down.
+    * Stops the pusher, puller, and processor threads and closes the
+    * connection to the transport layer.
+    */
+   protected void shutdown() {
+     // Make sure only one thread initiates shutdown...
+     synchronized (shutdownMutex) {
+       if (closeInProgress) {
+         return;
+       }
+       this.closeInProgress = true;
+     } // synchronized
+ 
+     // [bruce] log shutdown at info level and with ID to balance the
+     // "Starting" message.  recycleConn.conf is hard to debug w/o this
+     final String exceptionStatus = (this.exceptionInThreads() ? LocalizedStrings.DistributionManager_AT_LEAST_ONE_EXCEPTION_OCCURRED.toLocalizedString() : "");
+     logger.info(LocalizedMessage.create(
+         LocalizedStrings.DistributionManager_SHUTTING_DOWN_DISTRIBUTIONMANAGER_0_1,
+         new Object[] {this.myid, exceptionStatus}));
+ 
+     final long start = System.currentTimeMillis();
+     try {
+       if (this.rootCause instanceof ForcedDisconnectException) {
+         if (logger.isDebugEnabled()) {
+           logger.debug("inhibiting sending of shutdown message to other members due to forced-disconnect");
+         }
+       } else {
+         // Don't block indefinitely trying to send the shutdown message, in
+         // case other VMs in the system are ill-behaved. (bug 34710)
+         final Runnable r = new Runnable() {
+           public void run() {
+             try {
+               ConnectionTable.threadWantsSharedResources();
+               sendShutdownMessage();
+             }
+             catch (final CancelException e) {
+               // We were terminated.
+               logger.debug("Cancelled during shutdown message", e);
+             }
+           }
+         };
+         final Thread t = new Thread(threadGroup,
+             r, LocalizedStrings.DistributionManager_SHUTDOWN_MESSAGE_THREAD_FOR_0.toLocalizedString(this.myid));
+         t.start();
+         boolean interrupted = Thread.interrupted();
+         try {
+           t.join(MAX_STOP_TIME);
+         }
+         catch (final InterruptedException e) {
+           interrupted = true;
+           t.interrupt();
+           logger.warn(LocalizedMessage.create(
+               LocalizedStrings.DistributionManager_INTERRUPTED_SENDING_SHUTDOWN_MESSAGE_TO_PEERS), e);
+         }
+         finally {
+           if (interrupted) {
+             Thread.currentThread().interrupt();
+           }
+         }
+ 
+         if (t.isAlive()) {
+           t.interrupt();
+           logger.warn(LocalizedMessage.create(LocalizedStrings.DistributionManager_FAILED_SENDING_SHUTDOWN_MESSAGE_TO_PEERS_TIMEOUT));
+         }
+       }
+       
+     }
+     finally {
+       this.shutdownMsgSent = true; // in case sendShutdownMessage failed....
+       try {
+         this.uncleanShutdown(false);
+       }
+       finally {
+         final Long delta = Long.valueOf(System.currentTimeMillis() - start);
+         logger.info(LocalizedMessage.create(
+           LocalizedStrings.DistributionManager_DISTRIBUTIONMANAGER_STOPPED_IN_0_MS, delta));
+       }
+     }
+   }
+   
+   private void askThreadsToStop() {
+     // Stop executors after they have finished
+     ExecutorService es;
+     es = this.serialThread;
+     if (es != null) {
+       es.shutdown();
+     }
+     es = this.viewThread;
+     if (es != null) {
+       // Hmmm...OK, I'll let any view events currently in the queue be
+       // processed.  Not sure it's very important whether they get
+       // handled...
+       es.shutdown();
+     }
+     if (this.serialQueuedExecutorPool != null) {
+       this.serialQueuedExecutorPool.shutdown();
+     }
+     es = this.functionExecutionThread;
+     if (es != null) {
+       es.shutdown();
+     }
+     es = this.functionExecutionPool;
+     if (es != null) {
+       es.shutdown();
+     }
+     es = this.partitionedRegionThread;
+     if (es != null) {
+       es.shutdown();
+     }
+     es = this.partitionedRegionPool;
+     if (es != null) {
+       es.shutdown();
+     }
+     es = this.highPriorityPool;
+     if (es != null) {
+       es.shutdown();
+     }
+     es = this.waitingPool;
+     if (es != null) {
+       es.shutdown();
+     }
+     es = this.prMetaDataCleanupThreadPool;
+     if (es != null) {
+       es.shutdown();
+     }
+     es = this.threadPool;
+     if (es != null) {
+       es.shutdown();
+     }
+     
+     Thread th = this.memberEventThread;
+     if (th != null)
+       th.interrupt();
+   }
+   
+   private void waitForThreadsToStop(long timeInMillis) throws InterruptedException {
+     long start = System.currentTimeMillis();
+     long remaining = timeInMillis;
+     
+     ExecutorService[] allExecutors = new ExecutorService[] {
+         this.serialThread, 
+         this.viewThread, 
+         this.functionExecutionThread, 
+         this.functionExecutionPool,
+         this.partitionedRegionThread, 
+         this.partitionedRegionPool,
+         this.highPriorityPool,
+         this.waitingPool,
+         this.prMetaDataCleanupThreadPool,
+         this.threadPool};
+     for(ExecutorService es : allExecutors) { 
+       if (es != null) {
+         es.awaitTermination(remaining, TimeUnit.MILLISECONDS);
+       }
+       remaining = timeInMillis - (System.currentTimeMillis() - start);
+       if(remaining <= 0) {
+         return;
+       }
+     }
+     
+     
+     this.serialQueuedExecutorPool.awaitTermination(remaining, TimeUnit.MILLISECONDS);
+     remaining = timeInMillis - (System.currentTimeMillis() - start);
+     if(remaining <= 0) {
+       return;
+     }
+     Thread th = this.memberEventThread;
+     if (th != null) {
+       th.interrupt(); // bug #43452 - this thread sometimes eats interrupts, so we interrupt it again here
+       th.join(remaining);
+     }
+     
+   }
+   
+   /**
+    * maximum time, in milliseconds, to wait for all threads to exit
+    */
+   static private final int MAX_STOP_TIME = 20000;
+   
+   /**
+    * Time to sleep, in milliseconds, while polling to see if threads have 
+    * finished
+    */
+   static private final int STOP_PAUSE_TIME = 1000;
+   
+   /**
+    * Maximum number of interrupt attempts to stop a thread
+    */
+   static private final int MAX_STOP_ATTEMPTS = 10;
+   
+   /**
+    * Cheap tool to kill a referenced thread
+    * 
+    * @param t the thread to kill
+    */
+   private void clobberThread(Thread t) {
+     if (t == null)
+       return;
+     if (t.isAlive()) {
+       logger.warn(LocalizedMessage.create(LocalizedStrings.DistributionManager_FORCING_THREAD_STOP_ON__0_, t));
+       
+       // Start by being nice.
+       t.interrupt();
+       
+ // we could be more violent here...
+ //      t.stop();
+       try {
+         for (int i = 0; i < MAX_STOP_ATTEMPTS && t.isAlive(); i++) {
+           t.join(STOP_PAUSE_TIME);
+           t.interrupt();
+         }
+       }
+       catch (InterruptedException ex) {
+         logger.debug("Interrupted while attempting to terminate threads.");
+         Thread.currentThread().interrupt();
+         // just keep going
+       }
+       
+       if (t.isAlive()) {
+         logger.warn(LocalizedMessage.create(LocalizedStrings.DistributionManager_CLOBBERTHREAD_THREAD_REFUSED_TO_DIE__0, t));
+       }
+     }
+   }
+   
+   /**
+    * Cheap tool to examine an executor to see if it is still working
+    * @param tpe
+    * @return true if executor is still active
+    */
+   private boolean executorAlive(ThreadPoolExecutor tpe, String name)
+   {
+     if (tpe == null) {
+       return false;
+     } else {
+       int ac = tpe.getActiveCount();
+ //      boolean result = tpe.getActiveCount() > 0;
+       if (ac > 0) {
+         if (logger.isDebugEnabled()) {
+           logger.debug("Still waiting for {} threads in '{}' pool to exit", ac, name);
+         }
+         return true;
+       } else {
+         return false;
+       }
+     }
+   }
+   
+   /**
+    * Wait for the ancillary queues to exit.  Kills them if they are
+    * still around.
+    *
+    */
+   private void forceThreadsToStop() {
+     long endTime = System.currentTimeMillis() + MAX_STOP_TIME;
+     String culprits = "";
+     for (;;) {
+       boolean stillAlive = false;
+       culprits = "";
+       if (executorAlive(this.serialThread, "serial thread")) {
+         stillAlive = true;
+         culprits = culprits + " serial thread;";
+       }
+       if (executorAlive(this.viewThread, "view thread")) {
+         stillAlive = true;
+         culprits = culprits + " view thread;";
+       }
+       if (executorAlive(this.partitionedRegionThread, "partitioned region thread")) {
+         stillAlive = true;
+         culprits = culprits + " partitioned region thread;";
+       }
+       if (executorAlive(this.partitionedRegionPool, "partitioned region pool")) {
+         stillAlive = true;
+         culprits = culprits + " partitioned region pool;";
+       }
+       if (executorAlive(this.highPriorityPool, "high priority pool")) {
+         stillAlive = true;
+         culprits = culprits + " high priority pool;";
+       }
+       if (executorAlive(this.waitingPool, "waiting pool")) {
+         stillAlive = true;
+         culprits = culprits + " waiting pool;";
+       }
+       if (executorAlive(this.prMetaDataCleanupThreadPool, "prMetaDataCleanupThreadPool")) {
+         stillAlive = true;
+         culprits = culprits + " special waiting pool;";
+       }
+       if (executorAlive(this.threadPool, "thread pool")) {
+         stillAlive = true;
+         culprits = culprits + " thread pool;";
+       }
+       
+       if (!stillAlive)
+         return;
+       
+       long now = System.currentTimeMillis();
+       if (now >= endTime)
+         break;
+       
+       try {
+         Thread.sleep(STOP_PAUSE_TIME);
+       }
+       catch (InterruptedException e) {
+         Thread.currentThread().interrupt();
+         // Desperation, the shutdown thread is being killed.  Don't
+         // consult a CancelCriterion.
+         logger.warn(LocalizedMessage.create(
+             LocalizedStrings.DistributionManager_INTERRUPTED_DURING_SHUTDOWN), e); 
+         break;
+       }
+     } // for
+     
+     logger.warn(LocalizedMessage.create(
+         LocalizedStrings.DistributionManager_DAEMON_THREADS_ARE_SLOW_TO_STOP_CULPRITS_INCLUDE_0, culprits));
+     
+     // Kill with no mercy
+     if (this.serialThread != null) {
+       this.serialThread.shutdownNow();
+     }
+     if (this.viewThread != null) {
+       this.viewThread.shutdownNow();
+     }    
+     if (this.functionExecutionThread != null) {
+       this.functionExecutionThread.shutdownNow();
+     }
+     if (this.functionExecutionPool != null) {
+       this.functionExecutionPool.shutdownNow();
+     }
+     if (this.partitionedRegionThread != null) {
+       this.partitionedRegionThread.shutdownNow();
+     }
+     if (this.partitionedRegionPool != null) {
+       this.partitionedRegionPool.shutdownNow();
+     }
+     if (this.highPriorityPool != null) {
+       this.highPriorityPool.shutdownNow();
+     }
+     if (this.waitingPool != null) {
+       this.waitingPool.shutdownNow();
+     }
+     if (this.prMetaDataCleanupThreadPool != null) {
+       this.prMetaDataCleanupThreadPool.shutdownNow();
+     }
+     if (this.threadPool != null) {
+       this.threadPool.shutdownNow();
+     }
+     
+     Thread th = this.memberEventThread;
+     if (th != null) {
+       clobberThread(th);
+     }
+   }
+   
+   private volatile boolean shutdownInProgress = false;
+ 
+   /** guard for membershipViewIdAcknowledged */
+   private final Object membershipViewIdGuard = new Object();
+   
+   /** the latest view ID that has been processed by all membership listeners */
+   private long membershipViewIdAcknowledged;
+   
+   public boolean shutdownInProgress() {
+     return this.shutdownInProgress;
+   }
+   
+   /**
+    * Stops the pusher, puller and processor threads and closes the
+    * connection to the transport layer.  This should only be used from
+    * shutdown() or from the dm initialization code
+    */
+   private void uncleanShutdown(boolean duringStartup)
+   {
+     try {
+       this.closeInProgress = true; // set here also to fix bug 36736
+       removeAllHealthMonitors();
+       shutdownInProgress = true;
+       if (this.channel != null) {
+         this.channel.setShutDown();
+       }
+       
+       askThreadsToStop();
+ 
+       // wait a moment before asking threads to terminate
+       try { waitForThreadsToStop(1000); } 
+       catch (InterruptedException ie) {
+         // No need to reset interrupt bit, we're really trying to quit...
+       }
+       forceThreadsToStop();
+       
+ //      // bug36329: desperation measure, send a second interrupt?
+ //      try { Thread.sleep(1000); } 
+ //      catch (InterruptedException ie) {
+ //        // No need to reset interrupt bit, we're really trying to quit...
+ //      }
+ //      forceThreadsToStop();
+     } // try
+     finally {
+       // ABSOLUTELY ESSENTIAL that we close the distribution channel!
+       try {
+         // For safety, but channel close in a finally AFTER this...
+         if (this.stats != null) {
+           this.stats.close();
+           try { Thread.sleep(100); } 
+           catch (InterruptedException ie) {
+             // No need to reset interrupt bit, we're really trying to quit...
+           }
+         }
+       }
+       finally {
+         if (this.channel != null) {
+           logger.info(LocalizedMessage.create(LocalizedStrings.DistributionManager_NOW_CLOSING_DISTRIBUTION_FOR__0, this.myid));
+           this.channel.disconnect(duringStartup);
+ //          this.channel = null;  DO NOT NULL OUT INSTANCE VARIABLES AT SHUTDOWN - bug #42087
+         }
+       }
+     }
+   }
+ 
+   /**
+    * Returns the distributed system to which this distribution manager
+    * is connected.
+    */
+   public InternalDistributedSystem getSystem() {
+     return this.system;
+   }
+   
+   /**
+    * Returns the transport configuration for this distribution manager
+    * @since 5.0
+    */
+   public RemoteTransportConfig getTransport() {
+     return this.transport;
+   }
+ 
+ 
+   /**
+    * Adds a <code>MembershipListener</code> to this distribution
+    * manager.
+    */
+   public void addMembershipListener(MembershipListener l) {
+     this.membershipListeners.putIfAbsent(l, Boolean.TRUE);
+   }
+ 
+   /**
+    * Removes a <code>MembershipListener</code> from this distribution
+    * manager.
+    *
+    * @throws IllegalArgumentException
+    *         <code>l</code> was not registered on this distribution
+    *         manager
+    */
+   public void removeMembershipListener(MembershipListener l) {
+     this.membershipListeners.remove(l);
+   }
+ 
+   /**
+    * Adds a <code>MembershipListener</code> to this distribution
+    * manager.
+    * @since 5.7
+    */
+   public void addAllMembershipListener(MembershipListener l) {
+     synchroniz

<TRUNCATED>