You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/22 19:36:41 UTC
[79/83] [abbrv] incubator-geode git commit: GEODE-917: Merge branch
'feature/GEODE-917' into develop
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java
index 0000000,51bfb53..24b0697
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java
@@@ -1,0 -1,4811 +1,4811 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package com.gemstone.gemfire.distributed.internal;
+
+ import java.io.Externalizable;
+ import java.io.IOException;
+ import java.io.NotSerializableException;
+ import java.io.ObjectInput;
+ import java.io.ObjectOutput;
+ import java.io.Serializable;
+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Date;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.NoSuchElementException;
+ import java.util.Set;
+ import java.util.concurrent.BlockingQueue;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.Executor;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.Semaphore;
+ import java.util.concurrent.SynchronousQueue;
+ import java.util.concurrent.ThreadFactory;
+ import java.util.concurrent.ThreadPoolExecutor;
+ import java.util.concurrent.TimeUnit;
+
+ import org.apache.logging.log4j.Logger;
+
+ import com.gemstone.gemfire.CancelCriterion;
+ import com.gemstone.gemfire.CancelException;
+ import com.gemstone.gemfire.ForcedDisconnectException;
+ import com.gemstone.gemfire.IncompatibleSystemException;
+ import com.gemstone.gemfire.InternalGemFireError;
+ import com.gemstone.gemfire.InternalGemFireException;
+ import com.gemstone.gemfire.InvalidDeltaException;
+ import com.gemstone.gemfire.SystemConnectException;
+ import com.gemstone.gemfire.SystemFailure;
+ import com.gemstone.gemfire.ToDataException;
+ import com.gemstone.gemfire.admin.GemFireHealthConfig;
+ import com.gemstone.gemfire.distributed.DistributedMember;
+ import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
+ import com.gemstone.gemfire.distributed.Locator;
+ import com.gemstone.gemfire.distributed.Role;
+ import com.gemstone.gemfire.distributed.internal.locks.ElderState;
+ import com.gemstone.gemfire.distributed.internal.membership.DistributedMembershipListener;
+ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+ import com.gemstone.gemfire.distributed.internal.membership.MemberFactory;
+ import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
+ import com.gemstone.gemfire.distributed.internal.membership.NetView;
+ import com.gemstone.gemfire.i18n.StringId;
+ import com.gemstone.gemfire.internal.Assert;
+ import com.gemstone.gemfire.internal.NanoTimer;
+ import com.gemstone.gemfire.internal.OSProcess;
+ import com.gemstone.gemfire.internal.SetUtils;
+ import com.gemstone.gemfire.internal.SocketCreator;
+ import com.gemstone.gemfire.internal.Version;
+ import com.gemstone.gemfire.internal.admin.remote.AdminConsoleDisconnectMessage;
+ import com.gemstone.gemfire.internal.admin.remote.RemoteGfManagerAgent;
+ import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig;
+ import com.gemstone.gemfire.internal.cache.InitialImageOperation;
+ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+ import com.gemstone.gemfire.internal.logging.LogService;
+ import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
+ import com.gemstone.gemfire.internal.logging.log4j.AlertAppender;
+ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+ import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+ import com.gemstone.gemfire.internal.sequencelog.MembershipLogger;
+ import com.gemstone.gemfire.internal.tcp.Connection;
+ import com.gemstone.gemfire.internal.tcp.ConnectionTable;
+ import com.gemstone.gemfire.internal.tcp.ReenteredConnectException;
+ import com.gemstone.gemfire.internal.util.concurrent.StoppableReentrantLock;
+
+ /**
+ * The <code>DistributionManager</code> uses a {@link
+ * MembershipManager} to distribute {@link DistributionMessage messages}
+ * queued in {@link MQueue}s.
+ *
+ * <P>
+ *
+ * Code that wishes to send a {@link DistributionMessage} must get
+ * the <code>DistributionManager</code> and invoke {@link
+ * #putOutgoing}.
+ *
+ * <P>
+ *
+ * <code>DistributionManager</code> is not intended to be
+ * serialized. It is <code>Externalizable</code> only to prevent it
+ * from being copy shared. See {@link #writeExternal}.
+ *
+ * <P>
+ *
+ * Prior to GemFire 4.0, <code>DistributionManager</code> was an
+ * abstract class with two concrete subclasses,
+ * <code>LocalDistributionManager</code> and
+ * <code>ConsoleDistributionManager</code>. We decided that
+ * <code>ConsoleDistributionManager</code> (which was used for the GUI
+ * console and admin APIs) did not offer enough interesting
+ * functionality to warrant a separate class. More importantly, it
+ * prevented the Cache and admin APIs from being used in the same VM.
+ * So, we refactored the code of those two subclasses into
+ * <code>DistributionManager</code>.
+ *
+ * @author David Whitlock
+ * @since 2.0
+ *
+ * @see DistributionMessage#process
+ * @see IgnoredByManager
+ */
+ public class DistributionManager
+ implements Externalizable, DM {
+
+ private static final Logger logger = LogService.getLogger();
+
+ private static final boolean SYNC_EVENTS =
+ Boolean.getBoolean("DistributionManager.syncEvents");
+
+ /**
+ * WARNING: setting this to true may break dunit tests.
+ * <p>see com.gemstone.gemfire.cache30.ClearMultiVmCallBkDUnitTest
+ */
+ public static final boolean INLINE_PROCESS =
+ !Boolean.getBoolean("DistributionManager.enqueueOrderedMessages");
+
+ /** Flag indicating whether to use single Serial-Executor thread or
+ * Multiple Serial-executor thread,
+ */
+ public static final boolean MULTI_SERIAL_EXECUTORS =
+ !Boolean.getBoolean("DistributionManager.singleSerialExecutor");
+
+ /** The name of the distribution manager (identifies it in GemFire) */
+ public static final String NAME = "GemFire";
+
+ /** The number of milliseconds to wait for distribution-related
+ * things to happen */
+ public static final long TIMEOUT =
+ Long.getLong("DistributionManager.TIMEOUT", -1).longValue();
+
+ public static final int PUSHER_THREADS =
+ Integer.getInteger("DistributionManager.PUSHER_THREADS", 50).intValue();
+
+ public static final int PUSHER_QUEUE_SIZE =
+ Integer.getInteger("DistributionManager.PUSHER_QUEUE_SIZE", 4096).intValue();
+
+
+ public static final int MAX_WAITING_THREADS =
+ Integer.getInteger("DistributionManager.MAX_WAITING_THREADS", Integer.MAX_VALUE).intValue();
+
+ public static final int MAX_PR_META_DATA_CLEANUP_THREADS =
+ Integer.getInteger("DistributionManager.MAX_PR_META_DATA_CLEANUP_THREADS", 1).intValue();
+
+ public static final int MAX_THREADS = Integer.getInteger("DistributionManager.MAX_THREADS", 100).intValue();
+ public static final int MAX_PR_THREADS = Integer.getInteger("DistributionManager.MAX_PR_THREADS", Math.max(Runtime.getRuntime().availableProcessors()*4, 16)).intValue();
+ public static final int MAX_FE_THREADS = Integer.getInteger("DistributionManager.MAX_FE_THREADS", Math.max(Runtime.getRuntime().availableProcessors()*4, 16)).intValue();
+ // Integer.getInteger("DistributionManager.MAX_THREADS", max(Runtime.getRuntime().availableProcessors()*2, 2)).intValue();
+
+ public static final int INCOMING_QUEUE_LIMIT =
+ Integer.getInteger("DistributionManager.INCOMING_QUEUE_LIMIT", 80000).intValue();
+ public static final int INCOMING_QUEUE_THROTTLE =
+ Integer.getInteger("DistributionManager.INCOMING_QUEUE_THROTTLE", (int)(INCOMING_QUEUE_LIMIT * 0.75)).intValue();
+
+ /** Throttling based on the Queue byte size */
+ public static final double THROTTLE_PERCENT =
+ (double) (Integer.getInteger("DistributionManager.SERIAL_QUEUE_THROTTLE_PERCENT", 75).intValue())/100;
+ public static final int SERIAL_QUEUE_BYTE_LIMIT =
+ Integer.getInteger("DistributionManager.SERIAL_QUEUE_BYTE_LIMIT", (40 * (1024 * 1024))).intValue();
+ public static final int SERIAL_QUEUE_THROTTLE =
+ Integer.getInteger("DistributionManager.SERIAL_QUEUE_THROTTLE", (int)(SERIAL_QUEUE_BYTE_LIMIT * THROTTLE_PERCENT)).intValue();
+ public static final int TOTAL_SERIAL_QUEUE_BYTE_LIMIT =
+ Integer.getInteger("DistributionManager.TOTAL_SERIAL_QUEUE_BYTE_LIMIT", (80 * (1024 * 1024))).intValue();
+ public static final int TOTAL_SERIAL_QUEUE_THROTTLE =
+ Integer.getInteger("DistributionManager.TOTAL_SERIAL_QUEUE_THROTTLE", (int)(SERIAL_QUEUE_BYTE_LIMIT * THROTTLE_PERCENT)).intValue();
+
+ /** Throttling based on the Queue item size */
+ public static final int SERIAL_QUEUE_SIZE_LIMIT =
+ Integer.getInteger("DistributionManager.SERIAL_QUEUE_SIZE_LIMIT", 20000).intValue();
+ public static final int SERIAL_QUEUE_SIZE_THROTTLE =
+ Integer.getInteger("DistributionManager.SERIAL_QUEUE_SIZE_THROTTLE", (int)(SERIAL_QUEUE_SIZE_LIMIT * THROTTLE_PERCENT)).intValue();
+
+ /** Max number of serial Queue executors, in case of multi-serial-queue executor */
+ public static final int MAX_SERIAL_QUEUE_THREAD =
+ Integer.getInteger("DistributionManager.MAX_SERIAL_QUEUE_THREAD", 20).intValue();
+
+ /**
+ * Whether or not to include link local addresses in the list of addresses we use
+ * to determine if two members are no the same host.
+ *
+ * Added for normura issue 7033 - they have duplicate link local addresses on different boxes
+ */
+ public static volatile boolean INCLUDE_LINK_LOCAL_ADDRESSES =
+ Boolean.getBoolean("gemfire.IncludeLinkLocalAddresses");
+
+ /** The DM type for regular distribution managers */
+ public static final int NORMAL_DM_TYPE = 10;
+
+ /** The DM type for locator distribution managers
+ * @since 7.0
+ */
+ public static final int LOCATOR_DM_TYPE = 11;
+
+ /** The DM type for Console (admin-only) distribution managers */
+ public static final int ADMIN_ONLY_DM_TYPE = 12;
+
+ public static final int LONER_DM_TYPE = 13;
+
+ /**
+ * an NIO priority type
+ * @see com.gemstone.gemfire.distributed.internal.PooledDistributionMessage
+ * @see #SERIAL_EXECUTOR
+ * @see #HIGH_PRIORITY_EXECUTOR
+ * @see #WAITING_POOL_EXECUTOR
+ */
+ public static final int STANDARD_EXECUTOR = 73;
+
+ /**
+ * an NIO priority type
+ *
+ * @see com.gemstone.gemfire.distributed.internal.SerialDistributionMessage
+ * @see #STANDARD_EXECUTOR
+ */
+ public static final int SERIAL_EXECUTOR = 74;
+
+ /**
+ * an NIO priority type
+
+ * @see com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage
+ * @see #STANDARD_EXECUTOR
+ */
+ public static final int HIGH_PRIORITY_EXECUTOR = 75;
+
+ // 76 not in use
+
+ /**
+ * an NIO priority type
+ *
+ * @see com.gemstone.gemfire.internal.cache.InitialImageOperation
+ * @see #STANDARD_EXECUTOR
+ */
+ public static final int WAITING_POOL_EXECUTOR = 77;
+
+ /**
+ * an NIO priority type
+ *
+ * @see com.gemstone.gemfire.internal.cache.InitialImageOperation
+ * @see #STANDARD_EXECUTOR
+ */
+ public static final int PARTITIONED_REGION_EXECUTOR = 78;
+
+
+ /**
+ * Executor for view related messages
+ *
+ * @see com.gemstone.gemfire.distributed.internal.membership.gms.messages.ViewAckMessage
+ * @see #STANDARD_EXECUTOR
+ */
+ public static final int VIEW_EXECUTOR = 79;
+
+
+ public static final int REGION_FUNCTION_EXECUTION_EXECUTOR = 80;
+
+ /** The number of open distribution managers in this VM */
+ private static int openDMs = 0;
+
+ // /** The stack trace of the last time a console DM was opened */
+ // private static Exception openStackTrace;
+
+ /** Is this VM dedicated to administration (like a GUI console or a
+ * JMX agent)? If so, then it creates {@link #ADMIN_ONLY_DM_TYPE}
+ * type distribution managers.
+ *
+ * @since 4.0
+ */
+ public static volatile boolean isDedicatedAdminVM = false;
+
+ /**
+ * Is this admin agent used for a command line console.
+ * This flag controls whether connect will throw
+ * an exception or just wait for a DS if one is not
+ * available. If true, we will throw an exception.
+ *
+ */
+ public static volatile boolean isCommandLineAdminVM = false;
+
+
+
+ ///////////////////// Instance Fields //////////////////////
+
+ /** The id of this distribution manager */
+ final protected InternalDistributedMember myid;
+
+ /** The distribution manager type of this dm; set in its constructor. */
+ private final int dmType;
+
+ /** The <code>MembershipListener</code>s that are registered on this
+ * manager. */
+ private final ConcurrentMap membershipListeners;
+
+ /** A lock to hold while adding and removing membership listeners */
+ protected final Object membershipListenersLock =
+ new MembershipListenersLock();
+ /** The <code>MembershipListener</code>s that are registered on this
+ * manager for ALL members.
+ * @since 5.7
+ */
+ protected volatile Set allMembershipListeners = Collections.EMPTY_SET;
+ /**
+ * A lock to hold while adding and removing all membership listeners.
+ * @since 5.7
+ */
+ protected final Object allMembershipListenersLock =
+ new MembershipListenersLock();
+ /** A queue of MemberEvent instances */
+ protected final BlockingQueue membershipEventQueue =
+ new LinkedBlockingQueue();
+ /** Used to invoke registered membership listeners in the background. */
+ private Thread memberEventThread;
+
+
+ /** A brief description of this DistributionManager */
+ protected final String description;
+
+ /** Statistics about distribution */
+ protected /*final*/ DistributionStats stats;
+
+ /** Did an exception occur in one of the DM threads? */
+ protected boolean exceptionInThreads;
+
+ static ThreadLocal isStartupThread = new ThreadLocal();
+
+ protected volatile boolean shutdownMsgSent = false;
+
+ /** Set to true when this manager is being shutdown */
+ protected volatile boolean closeInProgress = false;
+
+ private volatile boolean receivedStartupResponse = false;
+
+ private volatile String rejectionMessage = null;
+
+ protected MembershipManager membershipManager;
+
+ /** The channel through which distributed communication occurs. */
+ protected DistributionChannel channel;
+
+ /**
+ * The (non-admin-only) members of the distributed system. This is a
+ * map of memberid->memberid for fast access to canonical ID references.
+ * All accesses to this
+ * field must be synchronized on {@link #membersLock}.
+ */
+ private Map<InternalDistributedMember,InternalDistributedMember> members = Collections.emptyMap();
+ /**
+ * All (admin and non-admin) members of the distributed system. All accesses
+ * to this field must be synchronized on {@link #membersLock}.
+ */
+ private Set membersAndAdmin = Collections.emptySet();
+ /**
+ * Map of all locator members of the distributed system. The value is a
+ * collection of locator strings that are hosted in that member. All accesses
+ * to this field must be synchronized on {@link #membersLock}.
+ */
+ private Map<InternalDistributedMember, Collection<String>> hostedLocatorsAll = Collections.emptyMap();
+
+ /**
+ * Map of all locator members of the distributed system which have the shared configuration. The value is a
+ * collection of locator strings that are hosted in that member. All accesses
+ * to this field must be synchronized on {@link #membersLock}.
+ */
+ private Map<InternalDistributedMember, Collection<String>> hostedLocatorsWithSharedConfiguration = Collections.emptyMap();
+
+ /**
+ * Since 6.6.2 and hereafter we will save the versions here. But pre-6.6.2's
+ * StartupResponseMessage does not contain version. We will assign a default
+ * version for them.
+ */
+ public static final String DEFAULT_VERSION_PRE_6_6_2 = "6.6.0.0";
+ /**
+ * The lock held while accessing the field references to the following:<br>
+ * 1) {@link #members}<br>
+ * 2) {@link #membersAndAdmin}<br>
+ * 3) {@link #hostedLocatorsAll}<br>
+ * 4) {@link #hostedLocatorsWithSharedConfiguration}<br>
+ */
+ private final Object membersLock = new MembersLock();
+
+ /**
+ * The lock held while writing {@link #adminConsoles}.
+ */
+ private final Object adminConsolesLock = new Object();
+ /**
+ * The ids of all known admin consoles
+ * Uses Copy on Write. Writers must sync on adminConsolesLock.
+ * Readers don't need to sync.
+ */
+ private volatile Set<InternalDistributedMember> adminConsoles = Collections.emptySet();
+
+ /** The pusher thread */
+ //private Thread pusher;
+
+ /** The group of distribution manager threads */
+ protected LoggingThreadGroup threadGroup;
+
+ /** Message processing thread pool */
+ private ThreadPoolExecutor threadPool;
+
+ /** High Priority processing thread pool, used for initializing messages
+ * such as UpdateAttributes and CreateRegion messages
+ */
+ private ThreadPoolExecutor highPriorityPool;
+
+ /** Waiting Pool, used for messages that may have to wait on something.
+ * Use this separate pool with an unbounded queue so that waiting
+ * runnables don't get in the way of other processing threads.
+ * Used for threads that will most likely have to wait for a region to be
+ * finished initializing before it can proceed
+ */
+ private ThreadPoolExecutor waitingPool;
+
+ private ThreadPoolExecutor prMetaDataCleanupThreadPool;
+
+ /**
+ * Thread used to decouple {@link com.gemstone.gemfire.internal.cache.partitioned.PartitionMessage}s from
+ * {@link com.gemstone.gemfire.internal.cache.DistributedCacheOperation}s </b>
+ * @see #SERIAL_EXECUTOR
+ */
+ private ThreadPoolExecutor partitionedRegionThread;
+ private ThreadPoolExecutor partitionedRegionPool;
+ private ThreadPoolExecutor functionExecutionThread;
+ private ThreadPoolExecutor functionExecutionPool;
+
+ /** Message processing executor for serial, ordered, messages. */
+ private ThreadPoolExecutor serialThread;
+
+ /** Message processing executor for view messages
+ * @see com.gemstone.gemfire.distributed.internal.membership.gms.messages.ViewAckMessage
+ */
+ private ThreadPoolExecutor viewThread;
+
+ /** If using a throttling queue for the serialThread, we cache the queue here
+ so we can see if delivery would block */
+ private ThrottlingMemLinkedQueueWithDMStats serialQueue;
+
+ /** a map keyed on InternalDistributedMember, to direct channels to other systems */
+ //protected final Map channelMap = CFactory.createCM();
+
+ protected volatile boolean readyForMessages = false;
+
+ /**
+ * Set to true once this DM is ready to send messages.
+ * Note that it is always ready to send the startup message.
+ */
+ private volatile boolean readyToSendMsgs = false;
+ private final Object readyToSendMsgsLock = new Object();
+
+ /** Is this distribution manager closed? */
+ protected volatile boolean closed = false;
+
+ /** The distributed system to which this distribution manager is
+ * connected. */
+ private InternalDistributedSystem system;
+
+ /** The remote transport configuration for this dm */
+ private RemoteTransportConfig transport;
+
+ /** The administration agent associated with this distribution
+ * manager. */
+ private volatile RemoteGfManagerAgent agent;
+
+ private SerialQueuedExecutorPool serialQueuedExecutorPool;
+
+ private final Semaphore parallelGIIs = new Semaphore(InitialImageOperation.MAX_PARALLEL_GIIS);
+
+ /**
+ * Map of InetAddress to HashSets of InetAddress, to define equivalences
+ * between network interface cards and hosts.
+ */
+ private final HashMap<InetAddress, Set<InetAddress>> equivalentHosts = new HashMap<InetAddress, Set<InetAddress>>();
+
+ private int distributedSystemId = DistributionConfig.DEFAULT_DISTRIBUTED_SYSTEM_ID;
+
+
+ private final Map<InternalDistributedMember, String> redundancyZones = Collections.synchronizedMap(new HashMap<InternalDistributedMember, String>());
+
+ private boolean enforceUniqueZone = false;
+
+ private volatile boolean isSharedConfigEnabledForDS = false;
+
+ @Override
+ public boolean isSharedConfigurationServiceEnabledForDS() {
+ return isSharedConfigEnabledForDS;
+ }
+
+ /**
+ * Identifier for function execution threads and any of their children
+ */
+ public static final InheritableThreadLocal<Boolean> isFunctionExecutionThread = new InheritableThreadLocal<Boolean>() {
+ @Override
+ protected Boolean initialValue() {
+ return Boolean.FALSE;
+ }
+ };
+ ////////////////////// Static Methods //////////////////////
+
+ /**
+ * Given two DistributionManager ids, check to see if they are
+ * from the same host address.
+ * @param id1 a DistributionManager id
+ * @param id2 a DistributionManager id
+ * @return true if id1 and id2 are from the same host, false otherwise
+ */
+ public static boolean isSameHost(InternalDistributedMember id1, InternalDistributedMember id2) {
+ return (id1.getInetAddress().equals(id2.getInetAddress()));
+ }
+
+ // @todo davidw Modify JGroups so that we do not have to send out a
+ // {@link StartupMessage}
+ /**
+ * Creates a new distribution manager and discovers the other members of the
+ * distributed system. Note that it does not check to see whether or not this
+ * VM already has a distribution manager.
+ *
+ * @param system
+ * The distributed system to which this distribution manager
+ * will send messages.
+ */
+ public static DistributionManager create(InternalDistributedSystem system)
+ {
+
+ DistributionManager dm = null;
+
+ try {
+
+ int vmKind;
+
+ if (Boolean.getBoolean(InternalLocator.FORCE_LOCATOR_DM_TYPE)) {
+ // if this DM is starting for a locator, set it to be a locator DM
+ vmKind = LOCATOR_DM_TYPE;
+
+ } else if (isDedicatedAdminVM) {
+ vmKind = ADMIN_ONLY_DM_TYPE;
+
+ } else {
+ vmKind = NORMAL_DM_TYPE;
+ }
+
+ RemoteTransportConfig transport = new RemoteTransportConfig(system.getConfig(), vmKind);
+ transport.setIsReconnectingDS(system.isReconnectingDS());
+ transport.setOldDSMembershipInfo(system.oldDSMembershipInfo());
+
+ long start = System.currentTimeMillis();
+
+ dm = new DistributionManager(system, transport);
+ dm.assertDistributionManagerType();
+
+ {
+ InternalDistributedMember id = dm.getDistributionManagerId();
+ if (!"".equals(id.getName())) {
+ for (InternalDistributedMember m: (List<InternalDistributedMember>)dm.getViewMembers()) {
+ if (m.equals(id)) {
+ // I'm counting on the members returned by getViewMembers being ordered such that
+ // members that joined before us will precede us AND members that join after us
+ // will succeed us.
+ // SO once we find ourself break out of this loop.
+ break;
+ }
+ if (id.getName().equals(m.getName())) {
+ if (dm.getMembershipManager().verifyMember(m, "member is using the name of " + id)) {
+ throw new IncompatibleSystemException("Member " + id + " could not join this distributed system because the existing member " + m + " used the same name. Set the \"name\" gemfire property to a unique value.");
+ }
+ }
+ }
+ }
+ dm.addNewMember(id); // add ourselves
+ dm.selectElder(); // ShutdownException could be thrown here
+ }
+
+ // Send out a StartupMessage to the other members.
+ StartupOperation op = new StartupOperation(dm, transport);
+
+ try {
+ if (!dm.sendStartupMessage(op, true)) {
+ // We'll we didn't hear back from anyone else. We assume that
+ // we're the first one.
+ if (dm.getOtherDistributionManagerIds().size() == 0) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DistributionManager_DIDNT_HEAR_BACK_FROM_ANY_OTHER_SYSTEM_I_AM_THE_FIRST_ONE));
+ } else if (transport.isMcastEnabled()) {
+ // perform a multicast ping test
+ if (!dm.testMulticast()) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.DistributionManager_RECEIVED_NO_STARTUP_RESPONSES_BUT_OTHER_MEMBERS_EXIST_MULTICAST_IS_NOT_RESPONSIVE));
+ }
+ }
+ }
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ // This is ALWAYS bad; don't consult a CancelCriterion.
+ throw new InternalGemFireException(LocalizedStrings.DistributionManager_INTERRUPTED_WHILE_WAITING_FOR_FIRST_STARTUPRESPONSEMESSAGE.toLocalizedString(), ex);
+ } catch (IncompatibleSystemException ex) {
+ logger.fatal(ex.getMessage(), ex);
+ throw ex;
+ } finally {
+ dm.readyToSendMsgs();
+ }
+
+ if (logger.isInfoEnabled()) {
+ long delta = System.currentTimeMillis() - start;
+ Object[] logArgs = new Object[] {
+ dm.getDistributionManagerId(),
+ transport,
+ Integer.valueOf(dm.getOtherDistributionManagerIds().size()),
+ dm.getOtherDistributionManagerIds(),
+ (logger.isInfoEnabled(LogMarker.DM) ? " (VERBOSE, took " + delta + " ms)" : ""),
+ ((dm.getDMType() == ADMIN_ONLY_DM_TYPE) ? " (admin only)" : (dm.getDMType() == LOCATOR_DM_TYPE) ? " (locator)" : "")
+ };
+ logger.info(LogMarker.DM, LocalizedMessage.create(
+ LocalizedStrings.DistributionManager_DISTRIBUTIONMANAGER_0_STARTED_ON_1_THERE_WERE_2_OTHER_DMS_3_4_5, logArgs));
+
+ MembershipLogger.logStartup(dm.getDistributionManagerId());
+ }
+ return dm;
+ }
+ catch (RuntimeException r) {
+ if (dm != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("cleaning up incompletely started DistributionManager due to exception", r);
+ }
+ dm.uncleanShutdown(true);
+ }
+ throw r;
+ }
+ }
+
+ void runUntilShutdown(Runnable r) {
+ try {
+ r.run();
+ }
+ catch (CancelException e) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Caught shutdown exception", e);
+ }
+ }
+ catch (VirtualMachineError err) {
+ SystemFailure.initiateFailure(err);
+ // If this ever returns, rethrow the error. We're poisoned
+ // now, so don't let this thread continue.
+ throw err;
+ }
+ catch (Throwable t) {
+ // Whenever you catch Error or Throwable, you must also
+ // catch VirtualMachineError (see above). However, there is
+ // _still_ a possibility that you are dealing with a cascading
+ // error condition, so you also need to check to see if the JVM
+ // is still usable:
+ SystemFailure.checkFailure();
+ if (isCloseInProgress()) {
+ logger.debug("Caught unusual exception during shutdown: {}", t.getMessage(), t);
+ }
+ else {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.DistributionManager_TASK_FAILED_WITH_EXCEPTION), t);
+ }
+ } }
+
+ volatile Throwable rootCause = null;
+
+ private static class Stopper extends CancelCriterion {
+ private DistributionManager dm;
+
+ // validateDM is commented out because expiry threads hit it with
+ // an ugly failure... use only for debugging lingering DM bugs
+ // private String validateDM() {
+ // GemFireCache cache = GemFireCache.getInstance();
+ // if (cache == null) {
+ // return null; // Distributed system with no cache
+ // }
+ // Object obj = cache.getDistributedSystem();
+ // if (obj == null) {
+ // return null; // Cache is very dead
+ // }
+ // InternalDistributedSystem ids = (InternalDistributedSystem)obj;
+ // DM current = ids.getDistributionManager();
+ // if (current != dm) {
+ // String response = LocalizedStrings.DistributionManager_CURRENT_CACHE_DISTRIBUTIONMANAGER_0_IS_NOT_THE_SAME_AS_1
+ // .toLocalizedString(new Object[] { current, dm});
+ // return response;
+ // }
+ // return null;
+ // }
+
+ Stopper(DistributionManager dm) {
+ this.dm = dm;
+ }
+ @Override
+ public String cancelInProgress() {
+ checkFailure();
+
+ // remove call to validateDM() to fix bug 38356
+
+ if (dm.shutdownMsgSent) {
+ return LocalizedStrings.DistributionManager__0_MESSAGE_DISTRIBUTION_HAS_TERMINATED.toLocalizedString(dm.toString());
+ }
+ if (dm.rootCause != null) {
+ return dm.toString() + ": " + dm.rootCause.getMessage();
+ }
+
+ // Nope.
+ return null;
+ }
+
+ @Override
+ public RuntimeException generateCancelledException(Throwable e) {
+ String reason = cancelInProgress();
+ if (reason == null) {
+ return null;
+ }
+ Throwable rc = dm.rootCause; // volatile read
+ if (rc == null) {
+ // No root cause, specify the one given and be done with it.
+ return new DistributedSystemDisconnectedException(reason, e);
+ }
+
+ if (e == null) {
+ // Caller did not specify any root cause, so just use our own.
+ return new DistributedSystemDisconnectedException(reason, rc);
+ }
+
+ // Attempt to stick rootCause at tail end of the exception chain.
+ Throwable nt = e;
+ while (nt.getCause() != null) {
+ nt = nt.getCause();
+ }
+ if (nt == rc) {
+ // Root cause already in place; we're done
+ return new DistributedSystemDisconnectedException(reason, e);
+ }
+
+ try {
+ nt.initCause(rc);
+ return new DistributedSystemDisconnectedException(reason, e);
+ }
+ catch (IllegalStateException e2) {
+ // Bug 39496 (Jrockit related) Give up. The following
+ // error is not entirely sane but gives the correct general picture.
+ return new DistributedSystemDisconnectedException(reason, rc);
+ }
+ }
+ }
+ private final Stopper stopper = new Stopper(this);
+
+ public CancelCriterion getCancelCriterion() {
+ return stopper;
+ }
+
+ /////////////////////// Constructors ///////////////////////
+
+ /**
+ * no-arg constructor for Externalizable
+ * TODO: does this class really need to implement Externalizable? I
+ * think it only implements that interface for the old copy-sharing
+ * shared-memory stuff that's no longer in GemFire
+ */
+ public DistributionManager() {
+ this.elderLock = null;
+ this.membershipListeners = null;
+ this.myid = null;
+ this.description = null;
+ this.dmType = 0;
+ throw new IllegalAccessError("this constructor should never be invoked");
+ }
+
+ /**
+ * Creates a new <code>DistributionManager</code> by initializing
+ * itself, creating the membership manager and executors
+ *
+ * @param transport
+ * The configuration for the communications transport
+ *
+ */
+ private DistributionManager(RemoteTransportConfig transport,
+ InternalDistributedSystem system) {
+
+ this.dmType = transport.getVmKind();
+ this.system = system;
+ this.elderLock = new StoppableReentrantLock(stopper);
+ this.transport = transport;
+
+ this.membershipListeners = new ConcurrentHashMap();
+ this.distributedSystemId = system.getConfig().getDistributedSystemId();
+ {
+ long statId = OSProcess.getId();
+ /* deadcoded since we don't know the channel id yet.
+ if (statId == 0 || statId == -1) {
+ statId = getChannelId();
+ }
+ */
+ this.stats = new DistributionStats(system, statId);
+ DistributionStats.enableClockStats = system.getConfig().getEnableTimeStatistics();
+ }
+
+ this.exceptionInThreads = false;
+
+ // Start the processing threads
+ final LoggingThreadGroup group =
+ LoggingThreadGroup.createThreadGroup("DistributionManager Threads", logger);
+ this.threadGroup = group;
+
+ boolean finishedConstructor = false;
+ try {
+
+ if (MULTI_SERIAL_EXECUTORS) {
+ if (logger.isInfoEnabled(LogMarker.DM)) {
+ logger.info(LogMarker.DM,
+ "Serial Queue info :" +
+ " THROTTLE_PERCENT: " + THROTTLE_PERCENT +
+ " SERIAL_QUEUE_BYTE_LIMIT :" + SERIAL_QUEUE_BYTE_LIMIT +
+ " SERIAL_QUEUE_THROTTLE :" + SERIAL_QUEUE_THROTTLE +
+ " TOTAL_SERIAL_QUEUE_BYTE_LIMIT :" + TOTAL_SERIAL_QUEUE_BYTE_LIMIT +
+ " TOTAL_SERIAL_QUEUE_THROTTLE :" + TOTAL_SERIAL_QUEUE_THROTTLE +
+ " SERIAL_QUEUE_SIZE_LIMIT :" + SERIAL_QUEUE_SIZE_LIMIT +
+ " SERIAL_QUEUE_SIZE_THROTTLE :" + SERIAL_QUEUE_SIZE_THROTTLE
+ );
+ }
+ // when TCP/IP is disabled we can't throttle the serial queue or we run the risk of
+ // distributed deadlock when we block the UDP reader thread
+ boolean throttlingDisabled = system.getConfig().getDisableTcp();
+ this.serialQueuedExecutorPool = new SerialQueuedExecutorPool(this.threadGroup, this.stats, throttlingDisabled);
+ }
+
+ {
+ BlockingQueue poolQueue;
+ if (SERIAL_QUEUE_BYTE_LIMIT == 0) {
+ poolQueue = new OverflowQueueWithDMStats(this.stats.getSerialQueueHelper());
+ } else {
+ this.serialQueue = new ThrottlingMemLinkedQueueWithDMStats(TOTAL_SERIAL_QUEUE_BYTE_LIMIT,
+ TOTAL_SERIAL_QUEUE_THROTTLE, SERIAL_QUEUE_SIZE_LIMIT, SERIAL_QUEUE_SIZE_THROTTLE,
+ this.stats.getSerialQueueHelper());
+ poolQueue = this.serialQueue;
+ }
+ ThreadFactory tf = new ThreadFactory() {
+ public Thread newThread(final Runnable command) {
+ DistributionManager.this.stats.incSerialThreadStarts();
+ final Runnable r = new Runnable() {
+ public void run() {
+ DistributionManager.this.stats.incNumSerialThreads(1);
+ try {
+ ConnectionTable.threadWantsSharedResources();
+ Connection.makeReaderThread();
+ runUntilShutdown(command);
+ // command.run();
+ } finally {
+ ConnectionTable.releaseThreadsSockets();
+ DistributionManager.this.stats.incNumSerialThreads(-1);
+ }
+ }
+ };
+ Thread thread = new Thread(group, r, LocalizedStrings.DistributionManager_SERIAL_MESSAGE_PROCESSOR.toLocalizedString());
+ thread.setDaemon(true);
+ return thread;
+ }
+ };
+ SerialQueuedExecutorWithDMStats executor = new SerialQueuedExecutorWithDMStats(poolQueue,
+ this.stats.getSerialProcessorHelper(), tf);
+ this.serialThread = executor;
+ }
+ {
+ BlockingQueue q = new LinkedBlockingQueue();
+ ThreadFactory tf = new ThreadFactory() {
+ public Thread newThread(final Runnable command) {
+ DistributionManager.this.stats.incViewThreadStarts();
+ final Runnable r = new Runnable() {
+ public void run() {
+ DistributionManager.this.stats.incNumViewThreads(1);
+ try {
+ ConnectionTable.threadWantsSharedResources();
+ Connection.makeReaderThread();
+ runUntilShutdown(command);
+ } finally {
+ ConnectionTable.releaseThreadsSockets();
+ DistributionManager.this.stats.incNumViewThreads(-1);
+ }
+ }
+ };
+ Thread thread = new Thread(group, r, LocalizedStrings.DistributionManager_VIEW_MESSAGE_PROCESSOR.toLocalizedString());
+ thread.setDaemon(true);
+ return thread;
+ }
+ };
+ this.viewThread = new SerialQueuedExecutorWithDMStats(q,
+ this.stats.getViewProcessorHelper(), tf);
+ }
+
+ {
+ BlockingQueue poolQueue;
+ if (INCOMING_QUEUE_LIMIT == 0) {
+ poolQueue = new OverflowQueueWithDMStats(this.stats.getOverflowQueueHelper());
+ } else {
+ poolQueue = new OverflowQueueWithDMStats(INCOMING_QUEUE_LIMIT, this.stats.getOverflowQueueHelper());
+ }
+ ThreadFactory tf = new ThreadFactory() {
+ private int next = 0;
+
+ public Thread newThread(final Runnable command) {
+ DistributionManager.this.stats.incProcessingThreadStarts();
+ final Runnable r = new Runnable() {
+ public void run() {
+ DistributionManager.this.stats.incNumProcessingThreads(1);
+ try {
+ ConnectionTable.threadWantsSharedResources();
+ Connection.makeReaderThread();
+ runUntilShutdown(command);
+ } finally {
+ ConnectionTable.releaseThreadsSockets();
+ DistributionManager.this.stats.incNumProcessingThreads(-1);
+ }
+ }
+ };
+ Thread thread = new Thread(group, r,
+ LocalizedStrings.DistributionManager_POOLED_MESSAGE_PROCESSOR.toLocalizedString() + (next++));
+ thread.setDaemon(true);
+ return thread;
+ }
+ };
+ ThreadPoolExecutor pool =
+ new PooledExecutorWithDMStats(poolQueue, MAX_THREADS, this.stats.getNormalPoolHelper(), tf);
+ this.threadPool = pool;
+ }
+
+
+ {
+ BlockingQueue poolQueue;
+ if (INCOMING_QUEUE_LIMIT == 0) {
+ poolQueue = new OverflowQueueWithDMStats(this.stats.getHighPriorityQueueHelper());
+ } else {
+ poolQueue = new OverflowQueueWithDMStats(INCOMING_QUEUE_LIMIT, this.stats.getHighPriorityQueueHelper());
+ }
+ ThreadFactory tf = new ThreadFactory() {
+ private int next = 0;
+
+ public Thread newThread(final Runnable command) {
+ DistributionManager.this.stats.incHighPriorityThreadStarts();
+ final Runnable r = new Runnable() {
+ public void run() {
+ DistributionManager.this.stats.incHighPriorityThreads(1);
+ try {
+ ConnectionTable.threadWantsSharedResources();
+ Connection.makeReaderThread();
+ runUntilShutdown(command);
+ } finally {
+ ConnectionTable.releaseThreadsSockets();
+ DistributionManager.this.stats.incHighPriorityThreads(-1);
+ }
+ }
+ };
+ Thread thread = new Thread(group, r,
+ LocalizedStrings.DistributionManager_POOLED_HIGH_PRIORITY_MESSAGE_PROCESSOR.toLocalizedString() + (next++));
+ thread.setDaemon(true);
+ return thread;
+ }
+ };
+ this.highPriorityPool = new PooledExecutorWithDMStats(poolQueue, MAX_THREADS, this.stats.getHighPriorityPoolHelper(), tf);
+ }
+
+
+ {
+ ThreadFactory tf = new ThreadFactory() {
+ private int next = 0;
+
+ public Thread newThread(final Runnable command) {
+ DistributionManager.this.stats.incWaitingThreadStarts();
+ final Runnable r = new Runnable() {
+ public void run() {
+ DistributionManager.this.stats.incWaitingThreads(1);
+ try {
+ ConnectionTable.threadWantsSharedResources();
+ Connection.makeReaderThread();
+ runUntilShutdown(command);
+ } finally {
+ ConnectionTable.releaseThreadsSockets();
+ DistributionManager.this.stats.incWaitingThreads(-1);
+ }
+ }
+ };
+ Thread thread = new Thread(group, r,
+ LocalizedStrings.DistributionManager_POOLED_WAITING_MESSAGE_PROCESSOR.toLocalizedString() + (next++));
+ thread.setDaemon(true);
+ return thread;
+ }
+ };
+ BlockingQueue poolQueue;
+ if (MAX_WAITING_THREADS == Integer.MAX_VALUE) {
+ // no need for a queue since we have infinite threads
+ poolQueue = new SynchronousQueue();
+ } else {
+ poolQueue = new OverflowQueueWithDMStats(this.stats.getWaitingQueueHelper());
+ }
+ this.waitingPool = new PooledExecutorWithDMStats(poolQueue,
+ MAX_WAITING_THREADS,
+ this.stats.getWaitingPoolHelper(),
+ tf);
+ }
+
+ {
+ ThreadFactory tf = new ThreadFactory() {
+ private int next = 0;
+
+ public Thread newThread(final Runnable command) {
+ DistributionManager.this.stats.incWaitingThreadStarts();//will it be ok?
+ final Runnable r = new Runnable() {
+ public void run() {
+ DistributionManager.this.stats.incWaitingThreads(1);//will it be ok
+ try {
+ ConnectionTable.threadWantsSharedResources();
+ Connection.makeReaderThread();
+ runUntilShutdown(command);
+ } finally {
+ ConnectionTable.releaseThreadsSockets();
+ DistributionManager.this.stats.incWaitingThreads(-1);
+ }
+ }
+ };
+ Thread thread = new Thread(group, r,
+ LocalizedStrings.DistributionManager_PR_META_DATA_CLEANUP_MESSAGE_PROCESSOR.toLocalizedString() + (next++));
+ thread.setDaemon(true);
+ return thread;
+ }
+ };
+ BlockingQueue poolQueue;
+ poolQueue = new OverflowQueueWithDMStats(this.stats.getWaitingQueueHelper());
+ this.prMetaDataCleanupThreadPool = new PooledExecutorWithDMStats(poolQueue,
+ MAX_PR_META_DATA_CLEANUP_THREADS,
+ this.stats.getWaitingPoolHelper(),
+ tf);
+ }
+
+ {
+ BlockingQueue poolQueue;
+ if (INCOMING_QUEUE_LIMIT == 0) {
+ poolQueue = new OverflowQueueWithDMStats(this.stats.getPartitionedRegionQueueHelper());
+ } else {
+ poolQueue = new OverflowQueueWithDMStats(INCOMING_QUEUE_LIMIT, this.stats.getPartitionedRegionQueueHelper());
+ }
+ ThreadFactory tf = new ThreadFactory() {
+ private int next = 0;
+
+ public Thread newThread(final Runnable command) {
+ DistributionManager.this.stats.incPartitionedRegionThreadStarts();
+ final Runnable r = new Runnable() {
+ public void run() {
+ stats.incPartitionedRegionThreads(1);
+ try {
+ ConnectionTable.threadWantsSharedResources();
+ Connection.makeReaderThread();
+ runUntilShutdown(command);
+ } finally {
+ ConnectionTable.releaseThreadsSockets();
+ stats.incPartitionedRegionThreads(-1);
+ }
+ }
+ };
+ Thread thread = new Thread(group, r,
+ "PartitionedRegion Message Processor" + (next++));
+ thread.setDaemon(true);
+ return thread;
+ }
+ };
+ if (MAX_PR_THREADS > 1) {
+ this.partitionedRegionPool = new PooledExecutorWithDMStats(poolQueue,
+ MAX_PR_THREADS, this.stats.getPartitionedRegionPoolHelper(), tf);
+ } else {
+ SerialQueuedExecutorWithDMStats executor = new SerialQueuedExecutorWithDMStats(poolQueue,
+ this.stats.getPartitionedRegionPoolHelper(), tf);
+ this.partitionedRegionThread = executor;
+ }
+
+ }
+
+ {
+ BlockingQueue poolQueue;
+ if (INCOMING_QUEUE_LIMIT == 0) {
+ poolQueue = new OverflowQueueWithDMStats(this.stats.getFunctionExecutionQueueHelper());
+ } else {
+ poolQueue = new OverflowQueueWithDMStats(INCOMING_QUEUE_LIMIT, this.stats.getFunctionExecutionQueueHelper());
+ }
+ ThreadFactory tf = new ThreadFactory() {
+ private int next = 0;
+
+ public Thread newThread(final Runnable command) {
+ DistributionManager.this.stats.incFunctionExecutionThreadStarts();
+ final Runnable r = new Runnable() {
+ public void run() {
+ stats.incFunctionExecutionThreads(1);
+ isFunctionExecutionThread.set(Boolean.TRUE);
+ try {
+ ConnectionTable.threadWantsSharedResources();
+ Connection.makeReaderThread();
+ runUntilShutdown(command);
+ } finally {
+ ConnectionTable.releaseThreadsSockets();
+ stats.incFunctionExecutionThreads(-1);
+ }
+ }
+ };
+ Thread thread = new Thread(group, r,
+ "Function Execution Processor" + (next++));
+ thread.setDaemon(true);
+ return thread;
+ }
+ };
+
+ if(MAX_FE_THREADS > 1){
+ this.functionExecutionPool = new FunctionExecutionPooledExecutor(poolQueue,
+ MAX_FE_THREADS, this.stats.getFunctionExecutionPoolHelper(), tf,true /*for fn exec*/);
+ } else {
+ SerialQueuedExecutorWithDMStats executor = new SerialQueuedExecutorWithDMStats(poolQueue,
+ this.stats.getFunctionExecutionPoolHelper(), tf);
+ this.functionExecutionThread = executor;
+ }
+
+ }
+
+ if (!SYNC_EVENTS) {
+ this.memberEventThread = new Thread(group, new MemberEventInvoker(),
+ "DM-MemberEventInvoker");
+ this.memberEventThread.setDaemon(true);
+ }
+
+ StringBuffer sb = new StringBuffer(" (took ");
+
+ long start = System.currentTimeMillis();
+
+ // Create direct channel first
+ // DirectChannel dc = new DirectChannel(new MyListener(this), system.getConfig(), logger, null);
+ // setDirectChannelPort(dc.getPort()); // store in a thread local
+
+ // connect to JGroups
+ start = System.currentTimeMillis();
+
+ MyListener l = new MyListener(this);
+ membershipManager = MemberFactory.newMembershipManager(l, system.getConfig(), transport, stats);
+
+ sb.append(System.currentTimeMillis() - start);
+
+ this.myid = membershipManager.getLocalMember();
+
+ // dc.patchUpAddress(this.myid);
+ // id.setDirectChannelPort(dc.getPort());
+
+ // create the distribution channel
+ this.channel = new DistributionChannel(membershipManager);
+
+ membershipManager.postConnect();
+
+ //Assert.assertTrue(this.getChannelMap().size() >= 1);
+ // System.out.println("Channel Map:");
+ // for (Iterator iter = this.getChannelMap().entrySet().iterator();
+ // iter.hasNext(); ) {
+ // Map.Entry entry = (Map.Entry) iter.next();
+ // Object key = entry.getKey();
+ // System.out.println(" " + key + " a " +
+ // key.getClass().getName() + " -> " +
+ // entry.getValue());
+ // }
+
+ sb.append(" ms)");
+
+ logger.info(LocalizedMessage.create(LocalizedStrings.DistributionManager_STARTING_DISTRIBUTIONMANAGER_0_1,
+ new Object[] { this.myid, (logger.isInfoEnabled(LogMarker.DM) ? sb.toString() : "")}));
+
+ this.description = NAME + " on " + this.myid + " started at "
+ + (new Date(System.currentTimeMillis())).toString();
+
+ finishedConstructor = true;
+ } finally {
+ if (!finishedConstructor) {
+ askThreadsToStop(); // fix for bug 42039
+ }
+ }
+ }
+
+ /**
+ * Creates a new distribution manager
+ *
+ * @param system
+ * The distributed system to which this distribution manager
+ * will send messages.
+ */
+ private DistributionManager(
+ InternalDistributedSystem system,
+ RemoteTransportConfig transport)
+ {
+ this(transport, system);
+
+ boolean finishedConstructor = false;
+ try {
+
+ isStartupThread.set(Boolean.TRUE);
+
+ startThreads();
+
+ // Since we need a StartupResponseMessage to make sure licenses
+ // are compatible the following has been deadcoded.
+ // // For the time being, invoke processStartupResponse()
+ // String rejectionMessage = null;
+ // if (GemFireVersion.getGemFireVersion().
+ // equals(state.getGemFireVersion())) {
+ // rejectionMessage = "Rejected new system node " +
+ // this.getDistributionManagerId() + " with version \"" +
+ // GemFireVersion.getGemFireVersion() +
+ // "\" because the distributed system's version is \"" +
+ // state.getGemFireVersion() + "\".";
+ // }
+ // this.processStartupResponse(state.getCacheTime(),
+ // rejectionMessage);
+
+ // Allow events to start being processed.
+ membershipManager.startEventProcessing();
+ for (;;) {
+ this.getCancelCriterion().checkCancelInProgress(null);
+ boolean interrupted = Thread.interrupted();
+ try {
+ membershipManager.waitForEventProcessing();
+ break;
+ }
+ catch (InterruptedException e) {
+ interrupted = true;
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ synchronized (DistributionManager.class) {
+ openDMs++;
+ }
+ finishedConstructor = true;
+ } finally {
+ if (!finishedConstructor) {
+ askThreadsToStop(); // fix for bug 42039
+ }
+ }
+ }
+
+ //////////////////// Instance Methods /////////////////////
+
+ /**
+ * Returns true if the two members are on the same equivalent host based
+ * on overlapping IP addresses collected for all NICs during exchange of
+ * startup messages.
+ *
+ * @param member1 First member
+ * @param member2 Second member
+ */
+ public boolean areOnEquivalentHost(InternalDistributedMember member1,
+ InternalDistributedMember member2) {
+ Set<InetAddress> equivalents1 = getEquivalents(member1.getInetAddress());
+ return equivalents1.contains(member2.getInetAddress());
+ }
+
+ /**
+ * Set the host equivalencies for a given host. This overrides any
+ * previous information in the tables.
+ * @param equivs list of InetAddress's that all point at same host
+ */
+ public void setEquivalentHosts(Set<InetAddress> equivs) {
+ Iterator<InetAddress> it = equivs.iterator();
+ synchronized (equivalentHosts) {
+ while (it.hasNext()) {
+ equivalentHosts.put(it.next(), Collections.unmodifiableSet(equivs));
+ }
+ }
+ }
+
+ public HashMap<InetAddress, Set<InetAddress>> getEquivalentHostsSnapshot() {
+ synchronized (this.equivalentHosts) {
+ return new HashMap<InetAddress, Set<InetAddress>>(this.equivalentHosts);
+ }
+ }
+
+ /**
+ * Return all of the InetAddress's that are equivalent to the given one (same
+ * host)
+ * @param in host to match up
+ * @return all the addresses thus equivalent
+ */
+ public Set<InetAddress> getEquivalents(InetAddress in) {
+ Set<InetAddress> result;
+ synchronized (equivalentHosts) {
+ result = equivalentHosts.get(in);
+ }
+ //DS 11/25/08 - It appears that when using VPN, the distributed member
+ //id is the vpn address, but that doesn't show up in the equivalents.
+ if(result == null) {
+ result = Collections.singleton(in);
+ }
+ return result;
+ }
+
+ public void setRedundancyZone(InternalDistributedMember member, String redundancyZone) {
+ if(redundancyZone != null && !redundancyZone.equals("")) {
+ this.redundancyZones.put(member, redundancyZone);
+ }
+ if (member != getDistributionManagerId()) {
+ String relationship = areInSameZone(getDistributionManagerId(), member) ? ""
+ : "not ";
+ Object[] logArgs = new Object[] { member, relationship };
+ logger.info(LocalizedMessage.create(LocalizedStrings.DistributionManager_DISTRIBUTIONMANAGER_MEMBER_0_IS_1_EQUIVALENT, logArgs));
+ }
+ }
+
+ /**
+ * Set the flag indicating that we should enforce unique zones.
+ * If we are already enforcing unique zones, keep it that way.
+ */
+ public void setEnforceUniqueZone(boolean enforceUniqueZone) {
+ this.enforceUniqueZone |= enforceUniqueZone;
+ }
+
+ public boolean enforceUniqueZone() {
+ return enforceUniqueZone;
+ }
+
+ public String getRedundancyZone(InternalDistributedMember member) {
+ return redundancyZones.get(member);
+ }
+
+ /**
+ * Asserts that distributionManagerType is LOCAL, GEMFIRE, or
+ * ADMIN_ONLY. Also asserts that the distributionManagerId
+ * (jgroups DistributedMember) has a VmKind that matches.
+ */
+ private void assertDistributionManagerType() {
+ // Assert that dmType is one of the three DM types...
+ int theDmType = getDMType();
+ switch (theDmType) {
+ case NORMAL_DM_TYPE:
+ case LONER_DM_TYPE:
+ case ADMIN_ONLY_DM_TYPE:
+ case LOCATOR_DM_TYPE:
+ break;
+ default:
+ Assert.assertTrue(false, "unknown distribution manager type");
+ }
+
+ // Assert InternalDistributedMember VmKind matches this DistributionManagerType...
+ final InternalDistributedMember theId = getDistributionManagerId();
+ final int vmKind = theId.getVmKind();
+ if (theDmType != vmKind) {
+ Assert.assertTrue(false,
+ "InternalDistributedMember has a vmKind of " + vmKind +
+ " instead of " + theDmType);
+ }
+ }
+
+ public int getDMType() {
+ return this.dmType;
+ }
+
+ public List<InternalDistributedMember> getViewMembers() {
+ NetView result = null;
+ DistributionChannel ch = this.channel;
+ if (ch != null) {
+ MembershipManager mgr = ch.getMembershipManager();
+ if (mgr != null) {
+ result = mgr.getView();
+ }
+ }
+ if (result == null) {
+ result = new NetView();
+ }
+ return result.getMembers();
+ }
+ /* implementation of DM.getOldestMember */
+ public DistributedMember getOldestMember(Collection c) throws NoSuchElementException {
+ List<InternalDistributedMember> view = getViewMembers();
+ for (int i=0; i<view.size(); i++) {
+ Object viewMbr = view.get(i);
+ Iterator it = c.iterator();
+ while (it.hasNext()) {
+ Object nextMbr = it.next();
+ if (viewMbr.equals(nextMbr)) {
+ return (DistributedMember)nextMbr;
+ }
+ }
+ }
+ throw new NoSuchElementException(LocalizedStrings.DistributionManager_NONE_OF_THE_GIVEN_MANAGERS_IS_IN_THE_CURRENT_MEMBERSHIP_VIEW.toLocalizedString());
+ }
+
+ private boolean testMulticast() {
+ return this.membershipManager.testMulticast();
+ }
+
+ /**
+ * Print a membership view (list of {@link InternalDistributedMember}s)
+ *
+ * @param v the list
+ * @return String
+ */
+ static public String printView(NetView v) {
+ if (v == null)
+ return "null";
+
+ return v.toString();
+ }
+
+ /**
+ * Need to do this outside the constructor so that the child
+ * constructor can finish.
+ */
+ protected void startThreads() {
+ this.system.setDM(this); // fix for bug 33362
+ if (this.memberEventThread != null)
+ this.memberEventThread.start();
+ try {
+
+ // And the distinguished guests today are...
+ NetView v = membershipManager.getView();
+ logger.info(LocalizedMessage.create(LocalizedStrings.DistributionManager_INITIAL_MEMBERSHIPMANAGER_VIEW___0, printView(v)));
+
+ // Add them all to our view
+ Iterator<InternalDistributedMember> it = v.getMembers().iterator();
+ while (it.hasNext()) {
+ addNewMember(it.next());
+ }
+
+ // Figure out who the elder is...
+ selectElder(); // ShutdownException could be thrown here
+ } catch (Exception ex) {
+ throw new InternalGemFireException(LocalizedStrings.DistributionManager_COULD_NOT_PROCESS_INITIAL_VIEW.toLocalizedString(), ex);
+ }
+ try {
+ getWaitingThreadPool().execute(new Runnable() {
+ public void run() {
+ // call in background since it might need to send a reply
+ // and we are not ready to send messages until startup is finished
+ isStartupThread.set(Boolean.TRUE);
+ readyForMessages();
+ }
+ });
+ }
+ catch (VirtualMachineError err) {
+ SystemFailure.initiateFailure(err);
+ // If this ever returns, rethrow the error. We're poisoned
+ // now, so don't let this thread continue.
+ throw err;
+ }
+ catch (Throwable t) {
+ // Whenever you catch Error or Throwable, you must also
+ // catch VirtualMachineError (see above). However, there is
+ // _still_ a possibility that you are dealing with a cascading
+ // error condition, so you also need to check to see if the JVM
+ // is still usable:
+ SystemFailure.checkFailure();
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributionManager_UNCAUGHT_EXCEPTION_CALLING_READYFORMESSAGES), t);
+ }
+ }
+
+ protected void readyForMessages() {
+ synchronized (this) {
+ this.readyForMessages = true;
+ this.notifyAll();
+ }
+ membershipManager.startEventProcessing();
+ }
+
+ protected void waitUntilReadyForMessages() {
+ if (readyForMessages)
+ return;
+ // membershipManager.waitForEventProcessing();
+ synchronized (this) {
+ for (;;) {
+ if (readyForMessages)
+ break;
+ stopper.checkCancelInProgress(null);
+ boolean interrupted = Thread.interrupted();
+ try {
+ this.wait();
+ }
+ catch (InterruptedException e) {
+ interrupted = true;
+ stopper.checkCancelInProgress(e);
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } // for
+ } // synchronized
+ }
+
+ /**
+ * Call when the DM is ready to send messages.
+ */
+ private void readyToSendMsgs() {
+ synchronized (this.readyToSendMsgsLock) {
+ this.readyToSendMsgs = true;
+ this.readyToSendMsgsLock.notifyAll();
+ }
+ }
+ /**
+ * Return when DM is ready to send out messages.
+ * @param msg the messsage that is currently being sent
+ */
+ protected void waitUntilReadyToSendMsgs(DistributionMessage msg) {
+ if (this.readyToSendMsgs) {
+ return;
+ }
+ // another process may have been started in the same view, so we need
+ // to be responsive to startup messages and be able to send responses
+ if (msg instanceof StartupMessage || msg instanceof StartupResponseMessage
+ || msg instanceof AdminMessageType) {
+ return;
+ }
+ if (isStartupThread.get() != null) {
+ // let the startup thread send messages
+ // the only case I know of that does this is if we happen to log a
+ // message during startup and an alert listener has registered.
+ return;
+ }
+ // membershipManager.waitForEventProcessing();
+ synchronized (this.readyToSendMsgsLock) {
+ for (;;) {
+ if (this.readyToSendMsgs)
+ break;
+ stopper.checkCancelInProgress(null);
+ boolean interrupted = Thread.interrupted();
+ try {
+ this.readyToSendMsgsLock.wait();
+ }
+ catch (InterruptedException e) {
+ interrupted = true;
+ stopper.checkCancelInProgress(e);
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } // for
+ } // synchronized
+ }
+
+ // DM method
+ @Override
+ public void forceUDPMessagingForCurrentThread() {
+ membershipManager.forceUDPMessagingForCurrentThread();
+ }
+
+ // DM method
+ @Override
+ public void releaseUDPMessagingForCurrentThread() {
+ membershipManager.releaseUDPMessagingForCurrentThread();
+ }
+
+ /**
+ * Did an exception occur in one of the threads launched by this
+ * distribution manager?
+ */
+ public boolean exceptionInThreads() {
+ return this.exceptionInThreads || this.threadGroup.getUncaughtExceptionsCount() > 0;
+ }
+
+ /**
+ * Clears the boolean that determines whether or not an exception
+ * occurred in one of the worker threads. This method should be
+ * used for testing purposes only!
+ */
+ void clearExceptionInThreads() {
+ this.exceptionInThreads = false;
+ this.threadGroup.clearUncaughtExceptionsCount();
+ }
+
+ /**
+ * Returns the current "cache time" in milliseconds since the epoch.
+ * The "cache time" takes into account skew among the local clocks
+ * on the various machines involved in the cache.
+ */
+ public long cacheTimeMillis() {
+ return this.system.getClock().cacheTimeMillis();
+ }
+
+
+
+ /**
+ * Returns the id of this distribution manager.
+ */
+ public InternalDistributedMember getDistributionManagerId() {
+ return this.myid;
+ }
+
+ /**
+ * Returns an unmodifiable set containing the identities of all of
+ * the known (non-admin-only) distribution managers.
+ */
+ public Set getDistributionManagerIds() {
+ // access to members synchronized under membersLock in order to
+ // ensure serialization
+ synchronized (this.membersLock) {
+ return this.members.keySet();
+ }
+ }
+
+ /**
+ * Adds the entry in {@link #hostedLocatorsAll} for a member with one or more
+ * hosted locators. The value is a collection of host[port] strings. If a
+ * bind-address was used for a locator then the form is bind-addr[port].
+ *
+ * @since 6.6.3
+ */
+ public void addHostedLocators(InternalDistributedMember member, Collection<String> locators, boolean isSharedConfigurationEnabled) {
+ synchronized (this.membersLock) {
+ if (locators == null || locators.isEmpty()) {
+ throw new IllegalArgumentException("Cannot use empty collection of locators");
+ }
+ if (this.hostedLocatorsAll.isEmpty()) {
+ this.hostedLocatorsAll = new HashMap<InternalDistributedMember, Collection<String>>();
+ }
+
+ if (!this.isSharedConfigEnabledForDS) {
+ this.isSharedConfigEnabledForDS = isSharedConfigurationEnabled;
+ }
+
+ Map<InternalDistributedMember, Collection<String>> tmp =
+ new HashMap<InternalDistributedMember, Collection<String>>(this.hostedLocatorsAll);
+ tmp.remove(member);
+ tmp.put(member, locators);
+ tmp = Collections.unmodifiableMap(tmp);
+ this.hostedLocatorsAll = tmp;
+
+ if (isSharedConfigurationEnabled) {
+ if (locators == null || locators.isEmpty()) {
+ throw new IllegalArgumentException("Cannot use empty collection of locators");
+ }
+ if (this.hostedLocatorsWithSharedConfiguration.isEmpty()) {
+ this.hostedLocatorsWithSharedConfiguration = new HashMap<InternalDistributedMember, Collection<String>>();
+ }
+ tmp = new HashMap<InternalDistributedMember, Collection<String>>(this.hostedLocatorsWithSharedConfiguration);
+ tmp.remove(member);
+ tmp.put(member, locators);
+ tmp = Collections.unmodifiableMap(tmp);
+ this.hostedLocatorsWithSharedConfiguration = tmp;
+ }
+
+ }
+ }
+
+
+ private void removeHostedLocators(InternalDistributedMember member) {
+ synchronized (this.membersLock) {
+ if (this.hostedLocatorsAll.containsKey(member)) {
+ Map<InternalDistributedMember, Collection<String>> tmp =
+ new HashMap<InternalDistributedMember, Collection<String>>(this.hostedLocatorsAll);
+ tmp.remove(member);
+ if (tmp.isEmpty()) {
+ tmp = Collections.emptyMap();
+ } else {
+ tmp = Collections.unmodifiableMap(tmp);
+ }
+ this.hostedLocatorsAll = tmp;
+ }
+ if (this.hostedLocatorsWithSharedConfiguration.containsKey(member)) {
+ Map<InternalDistributedMember, Collection<String>> tmp =
+ new HashMap<InternalDistributedMember, Collection<String>>(this.hostedLocatorsWithSharedConfiguration);
+ tmp.remove(member);
+ if (tmp.isEmpty()) {
+ tmp = Collections.emptyMap();
+ } else {
+ tmp = Collections.unmodifiableMap(tmp);
+ }
+ this.hostedLocatorsWithSharedConfiguration = tmp;
+ }
+ }
+ }
+
+
+
+ /**
+ * Gets the value in {@link #hostedLocatorsAll} for a member with one or more
+ * hosted locators. The value is a collection of host[port] strings. If a
+ * bind-address was used for a locator then the form is bind-addr[port].
+ *
+ * @since 6.6.3
+ */
+ public Collection<String> getHostedLocators(InternalDistributedMember member) {
+ synchronized (this.membersLock) {
+ return this.hostedLocatorsAll.get(member);
+ }
+ }
+
+ /**
+ * Returns a copy of the map of all members hosting locators. The key is the
+ * member, and the value is a collection of host[port] strings. If a
+ * bind-address was used for a locator then the form is bind-addr[port].
+ *
+ * @since 6.6.3
+ */
+ public Map<InternalDistributedMember, Collection<String>> getAllHostedLocators() {
+ synchronized (this.membersLock) {
+ return this.hostedLocatorsAll;
+ }
+ }
+ /**
+ * Returns a copy of the map of all members hosting locators with shared configuration. The key is the
+ * member, and the value is a collection of host[port] strings. If a
+ * bind-address was used for a locator then the form is bind-addr[port].
+ *
+ * @since 8.0
+ */
+ @Override
+ public Map<InternalDistributedMember, Collection<String>> getAllHostedLocatorsWithSharedConfiguration() {
+ synchronized (this.membersLock) {
+ return this.hostedLocatorsWithSharedConfiguration;
+ }
+ }
+ /**
+ * Returns an unmodifiable set containing the identities of all of
+ * the known (including admin) distribution managers.
+ */
+ public Set getDistributionManagerIdsIncludingAdmin() {
+ // access to members synchronized under membersLock in order to
+ // ensure serialization
+ synchronized (this.membersLock) {
+ return this.membersAndAdmin;
+ }
+ }
+
+
+ /**
+ * Returns the low-level distribution channel for this distribution
+ * manager. (brought over from ConsoleDistributionManager)
+ *
+ * @since 4.0
+ */
+ public DistributionChannel getDistributionChannel() {
+ return this.channel;
+ }
+
+
+ /**
+ * Returns a private-memory list containing the identities of all
+ * the other known distribution managers not including me.
+ */
+ public Set getOtherDistributionManagerIds() {
+ // We return a modified copy of the list, so
+ // collect the old list and copy under the lock.
+ Set result = new HashSet(getDistributionManagerIds());
+
+ InternalDistributedMember me = getDistributionManagerId();
+ result.remove(me);
+
+ // It's okay for my own id to not be in the list of all ids yet.
+ return result;
+ }
+ @Override
+ public Set getOtherNormalDistributionManagerIds() {
+ // We return a modified copy of the list, so
+ // collect the old list and copy under the lock.
+ Set result = new HashSet(getNormalDistributionManagerIds());
+
+ InternalDistributedMember me = getDistributionManagerId();
+ result.remove(me);
+
+ // It's okay for my own id to not be in the list of all ids yet.
+ return result;
+ }
+
+ public InternalDistributedMember getCanonicalId(DistributedMember id) {
+ // the members set is copy-on-write, so it is safe to iterate over it
+ InternalDistributedMember result = this.members.get(id);
+ if (result == null) {
+ return (InternalDistributedMember)id;
+ }
+ return result;
+ }
+
+ /**
+ * Add a membership listener and return other DistribtionManagerIds
+ * as an atomic operation
+ */
+ public Set addMembershipListenerAndGetDistributionManagerIds(MembershipListener l) {
+ // switched sync order to fix bug 30360
+ synchronized (this.membersLock) {
+ // Don't let the members come and go while we are adding this
+ // listener. This ensures that the listener (probably a
+ // ReplyProcessor) gets a consistent view of the members.
+ addMembershipListener(l);
+ // Note it is ok to return the members set
+ // because we will never modify the returned set.
+ return members.keySet();
+ }
+ }
+
+ public void addNewMember(InternalDistributedMember member) {
+ // This is the place to cleanup the zombieMembers
+ int vmType = member.getVmKind();
+ switch (vmType) {
+ case ADMIN_ONLY_DM_TYPE:
+ handleConsoleStartup(member);
+ break;
+ case LOCATOR_DM_TYPE:
+ case NORMAL_DM_TYPE:
+ handleManagerStartup(member);
+ break;
+ default:
+ throw new InternalGemFireError(LocalizedStrings.DistributionManager_UNKNOWN_MEMBER_TYPE_0.toLocalizedString(Integer.valueOf(vmType)));
+ }
+ }
+
+ /**
+ * Returns the identity of this <code>DistributionManager</code>
+ */
+ public InternalDistributedMember getId() {
+ return this.myid;
+ }
+
+ /**
+ * Returns the id of the underlying distribution channel used for
+ * communication.
+ *
+ * @since 3.0
+ */
+ public long getChannelId() {
+ return this.channel.getId();
+ }
+
+ /**
+ * Adds a message to the outgoing queue. Note that
+ * <code>message</code> should not be modified after it has been
+ * added to the queue. After <code>message</code> is distributed,
+ * it will be recycled.
+ *
+ * @return list of recipients who did not receive the message
+ * @throws NotSerializableException if the content is not serializable
+ */
+ public Set putOutgoingUserData(final DistributionMessage message)
+ throws NotSerializableException {
+ return sendMessage(message);
+ }
+
+ /**
+ * Send outgoing data; message is guaranteed to be serialized.
+ * @return list of recipients who did not receive the message
+ * @throws InternalGemFireException if message is not serializable
+ */
+ public Set putOutgoing(final DistributionMessage msg) {
+ try {
+ DistributionMessageObserver observer = DistributionMessageObserver.getInstance();
+ if(observer != null) {
+ observer.beforeSendMessage(this, msg);
+ }
+ return sendMessage(msg);
+ }
+ catch (NotSerializableException e) {
+ throw new InternalGemFireException(e);
+ }
+ catch (ToDataException e) {
+ // exception from user code
+ throw e;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return this.description;
+ }
+
+ /**
+ * @see #closeInProgress
+ */
+ private final Object shutdownMutex = new Object();
+
+ /**
+ * Informs other members that this dm is shutting down.
+ * Stops the pusher, puller, and processor threads and closes the
+ * connection to the transport layer.
+ */
+ protected void shutdown() {
+ // Make sure only one thread initiates shutdown...
+ synchronized (shutdownMutex) {
+ if (closeInProgress) {
+ return;
+ }
+ this.closeInProgress = true;
+ } // synchronized
+
+ // [bruce] log shutdown at info level and with ID to balance the
+ // "Starting" message. recycleConn.conf is hard to debug w/o this
+ final String exceptionStatus = (this.exceptionInThreads() ? LocalizedStrings.DistributionManager_AT_LEAST_ONE_EXCEPTION_OCCURRED.toLocalizedString() : "");
+ logger.info(LocalizedMessage.create(
+ LocalizedStrings.DistributionManager_SHUTTING_DOWN_DISTRIBUTIONMANAGER_0_1,
+ new Object[] {this.myid, exceptionStatus}));
+
+ final long start = System.currentTimeMillis();
+ try {
+ if (this.rootCause instanceof ForcedDisconnectException) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("inhibiting sending of shutdown message to other members due to forced-disconnect");
+ }
+ } else {
+ // Don't block indefinitely trying to send the shutdown message, in
+ // case other VMs in the system are ill-behaved. (bug 34710)
+ final Runnable r = new Runnable() {
+ public void run() {
+ try {
+ ConnectionTable.threadWantsSharedResources();
+ sendShutdownMessage();
+ }
+ catch (final CancelException e) {
+ // We were terminated.
+ logger.debug("Cancelled during shutdown message", e);
+ }
+ }
+ };
+ final Thread t = new Thread(threadGroup,
+ r, LocalizedStrings.DistributionManager_SHUTDOWN_MESSAGE_THREAD_FOR_0.toLocalizedString(this.myid));
+ t.start();
+ boolean interrupted = Thread.interrupted();
+ try {
+ t.join(MAX_STOP_TIME);
+ }
+ catch (final InterruptedException e) {
+ interrupted = true;
+ t.interrupt();
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.DistributionManager_INTERRUPTED_SENDING_SHUTDOWN_MESSAGE_TO_PEERS), e);
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ if (t.isAlive()) {
+ t.interrupt();
+ logger.warn(LocalizedMessage.create(LocalizedStrings.DistributionManager_FAILED_SENDING_SHUTDOWN_MESSAGE_TO_PEERS_TIMEOUT));
+ }
+ }
+
+ }
+ finally {
+ this.shutdownMsgSent = true; // in case sendShutdownMessage failed....
+ try {
+ this.uncleanShutdown(false);
+ }
+ finally {
+ final Long delta = Long.valueOf(System.currentTimeMillis() - start);
+ logger.info(LocalizedMessage.create(
+ LocalizedStrings.DistributionManager_DISTRIBUTIONMANAGER_STOPPED_IN_0_MS, delta));
+ }
+ }
+ }
+
+ private void askThreadsToStop() {
+ // Stop executors after they have finished
+ ExecutorService es;
+ es = this.serialThread;
+ if (es != null) {
+ es.shutdown();
+ }
+ es = this.viewThread;
+ if (es != null) {
+ // Hmmm...OK, I'll let any view events currently in the queue be
+ // processed. Not sure it's very important whether they get
+ // handled...
+ es.shutdown();
+ }
+ if (this.serialQueuedExecutorPool != null) {
+ this.serialQueuedExecutorPool.shutdown();
+ }
+ es = this.functionExecutionThread;
+ if (es != null) {
+ es.shutdown();
+ }
+ es = this.functionExecutionPool;
+ if (es != null) {
+ es.shutdown();
+ }
+ es = this.partitionedRegionThread;
+ if (es != null) {
+ es.shutdown();
+ }
+ es = this.partitionedRegionPool;
+ if (es != null) {
+ es.shutdown();
+ }
+ es = this.highPriorityPool;
+ if (es != null) {
+ es.shutdown();
+ }
+ es = this.waitingPool;
+ if (es != null) {
+ es.shutdown();
+ }
+ es = this.prMetaDataCleanupThreadPool;
+ if (es != null) {
+ es.shutdown();
+ }
+ es = this.threadPool;
+ if (es != null) {
+ es.shutdown();
+ }
+
+ Thread th = this.memberEventThread;
+ if (th != null)
+ th.interrupt();
+ }
+
+ private void waitForThreadsToStop(long timeInMillis) throws InterruptedException {
+ long start = System.currentTimeMillis();
+ long remaining = timeInMillis;
+
+ ExecutorService[] allExecutors = new ExecutorService[] {
+ this.serialThread,
+ this.viewThread,
+ this.functionExecutionThread,
+ this.functionExecutionPool,
+ this.partitionedRegionThread,
+ this.partitionedRegionPool,
+ this.highPriorityPool,
+ this.waitingPool,
+ this.prMetaDataCleanupThreadPool,
+ this.threadPool};
+ for(ExecutorService es : allExecutors) {
+ if (es != null) {
+ es.awaitTermination(remaining, TimeUnit.MILLISECONDS);
+ }
+ remaining = timeInMillis - (System.currentTimeMillis() - start);
+ if(remaining <= 0) {
+ return;
+ }
+ }
+
+
+ this.serialQueuedExecutorPool.awaitTermination(remaining, TimeUnit.MILLISECONDS);
+ remaining = timeInMillis - (System.currentTimeMillis() - start);
+ if(remaining <= 0) {
+ return;
+ }
+ Thread th = this.memberEventThread;
+ if (th != null) {
+ th.interrupt(); // bug #43452 - this thread sometimes eats interrupts, so we interrupt it again here
+ th.join(remaining);
+ }
+
+ }
+
+ /**
+ * maximum time, in milliseconds, to wait for all threads to exit
+ */
+ static private final int MAX_STOP_TIME = 20000;
+
+ /**
+ * Time to sleep, in milliseconds, while polling to see if threads have
+ * finished
+ */
+ static private final int STOP_PAUSE_TIME = 1000;
+
+ /**
+ * Maximum number of interrupt attempts to stop a thread
+ */
+ static private final int MAX_STOP_ATTEMPTS = 10;
+
+ /**
+ * Cheap tool to kill a referenced thread
+ *
+ * @param t the thread to kill
+ */
+ private void clobberThread(Thread t) {
+ if (t == null)
+ return;
+ if (t.isAlive()) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.DistributionManager_FORCING_THREAD_STOP_ON__0_, t));
+
+ // Start by being nice.
+ t.interrupt();
+
+ // we could be more violent here...
+ // t.stop();
+ try {
+ for (int i = 0; i < MAX_STOP_ATTEMPTS && t.isAlive(); i++) {
+ t.join(STOP_PAUSE_TIME);
+ t.interrupt();
+ }
+ }
+ catch (InterruptedException ex) {
+ logger.debug("Interrupted while attempting to terminate threads.");
+ Thread.currentThread().interrupt();
+ // just keep going
+ }
+
+ if (t.isAlive()) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.DistributionManager_CLOBBERTHREAD_THREAD_REFUSED_TO_DIE__0, t));
+ }
+ }
+ }
+
+ /**
+ * Cheap tool to examine an executor to see if it is still working
+ * @param tpe
+ * @return true if executor is still active
+ */
+ private boolean executorAlive(ThreadPoolExecutor tpe, String name)
+ {
+ if (tpe == null) {
+ return false;
+ } else {
+ int ac = tpe.getActiveCount();
+ // boolean result = tpe.getActiveCount() > 0;
+ if (ac > 0) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Still waiting for {} threads in '{}' pool to exit", ac, name);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Wait for the ancillary queues to exit. Kills them if they are
+ * still around.
+ *
+ */
+ private void forceThreadsToStop() {
+ long endTime = System.currentTimeMillis() + MAX_STOP_TIME;
+ String culprits = "";
+ for (;;) {
+ boolean stillAlive = false;
+ culprits = "";
+ if (executorAlive(this.serialThread, "serial thread")) {
+ stillAlive = true;
+ culprits = culprits + " serial thread;";
+ }
+ if (executorAlive(this.viewThread, "view thread")) {
+ stillAlive = true;
+ culprits = culprits + " view thread;";
+ }
+ if (executorAlive(this.partitionedRegionThread, "partitioned region thread")) {
+ stillAlive = true;
+ culprits = culprits + " partitioned region thread;";
+ }
+ if (executorAlive(this.partitionedRegionPool, "partitioned region pool")) {
+ stillAlive = true;
+ culprits = culprits + " partitioned region pool;";
+ }
+ if (executorAlive(this.highPriorityPool, "high priority pool")) {
+ stillAlive = true;
+ culprits = culprits + " high priority pool;";
+ }
+ if (executorAlive(this.waitingPool, "waiting pool")) {
+ stillAlive = true;
+ culprits = culprits + " waiting pool;";
+ }
+ if (executorAlive(this.prMetaDataCleanupThreadPool, "prMetaDataCleanupThreadPool")) {
+ stillAlive = true;
+ culprits = culprits + " special waiting pool;";
+ }
+ if (executorAlive(this.threadPool, "thread pool")) {
+ stillAlive = true;
+ culprits = culprits + " thread pool;";
+ }
+
+ if (!stillAlive)
+ return;
+
+ long now = System.currentTimeMillis();
+ if (now >= endTime)
+ break;
+
+ try {
+ Thread.sleep(STOP_PAUSE_TIME);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ // Desperation, the shutdown thread is being killed. Don't
+ // consult a CancelCriterion.
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.DistributionManager_INTERRUPTED_DURING_SHUTDOWN), e);
+ break;
+ }
+ } // for
+
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.DistributionManager_DAEMON_THREADS_ARE_SLOW_TO_STOP_CULPRITS_INCLUDE_0, culprits));
+
+ // Kill with no mercy
+ if (this.serialThread != null) {
+ this.serialThread.shutdownNow();
+ }
+ if (this.viewThread != null) {
+ this.viewThread.shutdownNow();
+ }
+ if (this.functionExecutionThread != null) {
+ this.functionExecutionThread.shutdownNow();
+ }
+ if (this.functionExecutionPool != null) {
+ this.functionExecutionPool.shutdownNow();
+ }
+ if (this.partitionedRegionThread != null) {
+ this.partitionedRegionThread.shutdownNow();
+ }
+ if (this.partitionedRegionPool != null) {
+ this.partitionedRegionPool.shutdownNow();
+ }
+ if (this.highPriorityPool != null) {
+ this.highPriorityPool.shutdownNow();
+ }
+ if (this.waitingPool != null) {
+ this.waitingPool.shutdownNow();
+ }
+ if (this.prMetaDataCleanupThreadPool != null) {
+ this.prMetaDataCleanupThreadPool.shutdownNow();
+ }
+ if (this.threadPool != null) {
+ this.threadPool.shutdownNow();
+ }
+
+ Thread th = this.memberEventThread;
+ if (th != null) {
+ clobberThread(th);
+ }
+ }
+
+ private volatile boolean shutdownInProgress = false;
+
+ /** guard for membershipViewIdAcknowledged */
+ private final Object membershipViewIdGuard = new Object();
+
+ /** the latest view ID that has been processed by all membership listeners */
+ private long membershipViewIdAcknowledged;
+
+ public boolean shutdownInProgress() {
+ return this.shutdownInProgress;
+ }
+
+ /**
+ * Stops the pusher, puller and processor threads and closes the
+ * connection to the transport layer. This should only be used from
+ * shutdown() or from the dm initialization code
+ */
+ private void uncleanShutdown(boolean duringStartup)
+ {
+ try {
+ this.closeInProgress = true; // set here also to fix bug 36736
+ removeAllHealthMonitors();
+ shutdownInProgress = true;
+ if (this.channel != null) {
+ this.channel.setShutDown();
+ }
+
+ askThreadsToStop();
+
+ // wait a moment before asking threads to terminate
+ try { waitForThreadsToStop(1000); }
+ catch (InterruptedException ie) {
+ // No need to reset interrupt bit, we're really trying to quit...
+ }
+ forceThreadsToStop();
+
+ // // bug36329: desperation measure, send a second interrupt?
+ // try { Thread.sleep(1000); }
+ // catch (InterruptedException ie) {
+ // // No need to reset interrupt bit, we're really trying to quit...
+ // }
+ // forceThreadsToStop();
+ } // try
+ finally {
+ // ABSOLUTELY ESSENTIAL that we close the distribution channel!
+ try {
+ // For safety, but channel close in a finally AFTER this...
+ if (this.stats != null) {
+ this.stats.close();
+ try { Thread.sleep(100); }
+ catch (InterruptedException ie) {
+ // No need to reset interrupt bit, we're really trying to quit...
+ }
+ }
+ }
+ finally {
+ if (this.channel != null) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DistributionManager_NOW_CLOSING_DISTRIBUTION_FOR__0, this.myid));
+ this.channel.disconnect(duringStartup);
+ // this.channel = null; DO NOT NULL OUT INSTANCE VARIABLES AT SHUTDOWN - bug #42087
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns the distributed system to which this distribution manager
+ * is connected.
+ */
+ public InternalDistributedSystem getSystem() {
+ return this.system;
+ }
+
+ /**
+ * Returns the transport configuration for this distribution manager
+ * @since 5.0
+ */
+ public RemoteTransportConfig getTransport() {
+ return this.transport;
+ }
+
+
+ /**
+ * Adds a <code>MembershipListener</code> to this distribution
+ * manager.
+ */
+ public void addMembershipListener(MembershipListener l) {
+ this.membershipListeners.putIfAbsent(l, Boolean.TRUE);
+ }
+
+ /**
+ * Removes a <code>MembershipListener</code> from this distribution
+ * manager.
+ *
+ * @throws IllegalArgumentException
+ * <code>l</code> was not registered on this distribution
+ * manager
+ */
+ public void removeMembershipListener(MembershipListener l) {
+ this.membershipListeners.remove(l);
+ }
+
+ /**
+ * Adds a <code>MembershipListener</code> to this distribution
+ * manager.
+ * @since 5.7
+ */
+ public void addAllMembershipListener(MembershipListener l) {
+ synchroniz
<TRUNCATED>