You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/02/22 22:44:07 UTC

[079/100] [abbrv] incubator-geode git commit: GEODE-917: Merge branch 'feature/GEODE-917' into develop

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
index 0000000,69ae6d8..f650fee
mode 000000,100755..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
@@@ -1,0 -1,3112 +1,3116 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package com.gemstone.gemfire.internal.cache.tier.sockets;
+ 
+ import java.io.ByteArrayInputStream;
+ import java.io.DataInputStream;
+ import java.io.IOException;
+ import java.net.Socket;
+ import java.net.SocketException;
+ import java.nio.ByteBuffer;
+ import java.util.Arrays;
+ import java.util.Collections;
+ import java.util.Date;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.ConcurrentLinkedQueue;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReadWriteLock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
+ import java.util.regex.Pattern;
+ 
+ import org.apache.logging.log4j.Logger;
+ 
+ import com.gemstone.gemfire.CancelException;
+ import com.gemstone.gemfire.DataSerializer;
+ import com.gemstone.gemfire.StatisticsFactory;
+ import com.gemstone.gemfire.cache.Cache;
+ import com.gemstone.gemfire.cache.CacheClosedException;
+ import com.gemstone.gemfire.cache.CacheException;
+ import com.gemstone.gemfire.cache.ClientSession;
+ import com.gemstone.gemfire.cache.DynamicRegionFactory;
+ import com.gemstone.gemfire.cache.InterestRegistrationEvent;
+ import com.gemstone.gemfire.cache.InterestResultPolicy;
+ import com.gemstone.gemfire.cache.Region;
+ import com.gemstone.gemfire.cache.RegionDestroyedException;
+ import com.gemstone.gemfire.cache.RegionExistsException;
+ import com.gemstone.gemfire.cache.client.internal.RegisterInterestTracker;
+ import com.gemstone.gemfire.cache.operations.DestroyOperationContext;
+ import com.gemstone.gemfire.cache.operations.InvalidateOperationContext;
+ import com.gemstone.gemfire.cache.operations.OperationContext;
+ import com.gemstone.gemfire.cache.operations.PutOperationContext;
+ import com.gemstone.gemfire.cache.operations.RegionClearOperationContext;
+ import com.gemstone.gemfire.cache.operations.RegionCreateOperationContext;
+ import com.gemstone.gemfire.cache.operations.RegionDestroyOperationContext;
+ import com.gemstone.gemfire.cache.query.CqException;
+ import com.gemstone.gemfire.cache.query.CqQuery;
+ import com.gemstone.gemfire.cache.query.internal.cq.CqService;
+ import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
+ import com.gemstone.gemfire.distributed.DistributedMember;
+ import com.gemstone.gemfire.distributed.internal.DistributionManager;
+ import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+ import com.gemstone.gemfire.internal.SystemTimer;
+ import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
+ import com.gemstone.gemfire.internal.Version;
+ import com.gemstone.gemfire.internal.cache.ClientServerObserver;
+ import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
+ import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisee;
+ import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.InitialImageAdvice;
+ import com.gemstone.gemfire.internal.cache.Conflatable;
+ import com.gemstone.gemfire.internal.cache.DistributedRegion;
+ import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
+ import com.gemstone.gemfire.internal.cache.EventID;
+ import com.gemstone.gemfire.internal.cache.FilterProfile;
+ import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+ import com.gemstone.gemfire.internal.cache.InterestRegistrationEventImpl;
+ import com.gemstone.gemfire.internal.cache.LocalRegion;
+ import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+ import com.gemstone.gemfire.internal.cache.StateFlushOperation;
+ import com.gemstone.gemfire.internal.cache.ha.HAContainerWrapper;
+ import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
+ import com.gemstone.gemfire.internal.cache.ha.HARegionQueueAttributes;
+ import com.gemstone.gemfire.internal.cache.ha.HARegionQueueStats;
+ import com.gemstone.gemfire.internal.cache.tier.InterestType;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessageImpl.CqNameToOp;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.command.Get70;
+ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+ import com.gemstone.gemfire.internal.logging.LogService;
+ import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
+ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+ import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+ import com.gemstone.gemfire.internal.security.AuthorizeRequestPP;
+ import com.gemstone.gemfire.security.AccessControl;
+ import com.gemstone.gemfire.i18n.StringId;
+ 
+ /**
+  * Class <code>CacheClientProxy</code> represents the server side of the
+  * {@link CacheClientUpdater}. It queues messages to be sent from the server to
+  * the client. It then reads those messages from the queue and sends them to the
+  * client.
+  *
+  * @author Barry Oglesby
+  *
+  * @since 4.2
+  */
+ @SuppressWarnings("synthetic-access")
+ public class CacheClientProxy implements ClientSession {
+   private static final Logger logger = LogService.getLogger();
+   
+   /**
+    * The socket between the server and the client
+    */
+   protected Socket _socket;
+   
+   private final AtomicBoolean _socketClosed = new AtomicBoolean();
+ 
+   /**
+    * A communication buffer used by each message we send to the client
+    */
+   protected ByteBuffer _commBuffer;
+ 
+   /**
+    * The remote host's IP address string (cached for convenience)
+    */
+   protected String _remoteHostAddress;
+ 
+   /**
+    * Concurrency: protected by synchronization of {@link #isMarkedForRemovalLock}
+    */
+   protected boolean isMarkedForRemoval = false;
+   
+   /**
+    * @see #isMarkedForRemoval
+    */
+   protected final Object isMarkedForRemovalLock = new Object();
+   
+   /**
+    * The proxy id of the client represented by this proxy
+    */
+   protected ClientProxyMembershipID proxyID;
+ 
+   /**
+    * The GemFire cache
+    */
+   protected final GemFireCacheImpl _cache;
+ 
+   /**
+    * The list of keys that the client represented by this proxy is interested in
+    * (stored by region)
+    */
+   protected final ClientInterestList[] cils = new ClientInterestList[2];
+   
+   /**
+    * A thread that dispatches messages to the client
+    */
+   protected volatile MessageDispatcher _messageDispatcher;
+ 
+   /**
+    * The statistics for this proxy
+    */
+   protected final CacheClientProxyStats _statistics;
+ 
+   protected final AtomicReference _durableExpirationTask = new AtomicReference();
+ 
+   protected SystemTimer durableTimer;
+ 
+   /**
+    * Whether this dispatcher is paused
+    */
+   protected volatile boolean _isPaused = true;
+ 
+   /**
+    * True if we are connected to a client.
+    */
+   private volatile boolean connected = false;
+ //  /**
+ //   * A string representing interest in all keys
+ //   */
+ //  protected static final String ALL_KEYS = "ALL_KEYS";
+ //
+   /**
+    * True if a marker message is still in the ha queue.
+    */
+   private boolean markerEnqueued = false;
+ 
+   /**
+    * The number of times to peek on shutdown before giving up and shutting down
+    */
+   protected static final int MAXIMUM_SHUTDOWN_PEEKS = Integer.getInteger("gemfire.MAXIMUM_SHUTDOWN_PEEKS",50).intValue();
+ 
+   /**
+    * The number of milliseconds to wait for an offering to the message queue
+    */
+   protected static final int MESSAGE_OFFER_TIME = 0;
+ 
+   /**
+    * The default maximum message queue size
+    */
+ //   protected static final int MESSAGE_QUEUE_SIZE_DEFAULT = 230000;
+ 
+   /** The message queue size */
+   protected final int _maximumMessageCount;
+ 
+   /**
+    * The time (in seconds ) after which a message in the client queue will
+    * expire.
+    */
+   protected final int _messageTimeToLive;
+ 
+   /**
+    * The <code>CacheClientNotifier</code> registering this proxy.
+    */
+   protected final CacheClientNotifier _cacheClientNotifier;
+ 
+   /**
+    * Defaults to true; meaning do some logging of dropped client notification
+    * messages. Set the system property to true to cause dropped messages to NOT
+    * be logged.
+    */
+   protected static final boolean LOG_DROPPED_MSGS = !Boolean
+       .getBoolean("gemfire.disableNotificationWarnings");
+ 
+   /**
+    * for testing purposes, delays the start of the dispatcher thread
+    */
+   public static boolean isSlowStartForTesting = false;
+ 
+   /**
+    * Default value for slow starting time of dispatcher
+    */
+   private static final long DEFAULT_SLOW_STARTING_TIME = 5000;
+ 
+   /**
+    * Key in the system property from which the slow starting time value will be
+    * retrieved
+    */
+   private static final String KEY_SLOW_START_TIME_FOR_TESTING = "slowStartTimeForTesting";
+ 
+   private boolean isPrimary;
+   
+   /** @since 5.7 */
+   protected byte clientConflation = HandShake.CONFLATION_DEFAULT;
+   
+   /**
+    * Flag to indicate whether to keep a durable client's queue alive
+    */
+   boolean keepalive = false;
+ 
+   private AccessControl postAuthzCallback;
+ 
+   /**
+    * For multiuser environment..
+    */
+   private ClientUserAuths clientUserAuths;
+   
+   private final Object clientUserAuthsLock = new Object();
+   
+   /**
+    * The version of the client
+    */
+   private Version clientVersion;  
+ 
+   /**
+    * A map of region name as key and integer as its value. Basically, it stores
+    * the names of the regions with <code>DataPolicy</code> as EMPTY. If an 
+    * event's region name is present in this map, it's full value (and not 
+    * delta) is sent to the client represented by this proxy.
+    *
+    * @since 6.1
+    */
+   private volatile Map regionsWithEmptyDataPolicy = new HashMap();
+ 
+   /**
+    * A debug flag used for testing Backward compatibility
+    */
+   public static boolean AFTER_MESSAGE_CREATION_FLAG = false;
+   
+   /**
+    * Notify the region when a client interest registration occurs. This tells
+    * the region to update access time when an update is to be pushed to a
+    * client. It is enabled only for <code>PartitionedRegion</code>s
+    * currently.
+    */
+   protected static final boolean NOTIFY_REGION_ON_INTEREST = Boolean
+       .getBoolean("gemfire.updateAccessTimeOnClientInterest");   
+   
+   /**
+    * The AcceptorImpl identifier to which the proxy is connected.
+    */
+   private final long _acceptorId;
+   
+   /** acceptor's setting for notifyBySubscription */
+   private final boolean notifyBySubscription;
+   
+   /** To queue the events arriving during message dispatcher initialization */ 
+   private volatile ConcurrentLinkedQueue<Conflatable> queuedEvents = new ConcurrentLinkedQueue<Conflatable>();
+   
+   private final Object queuedEventsSync = new Object();
+   
+   private volatile boolean messageDispatcherInit = false;
+ 
+   /**
+    * A counter that keeps track of how many task iterations that have occurred
+    * since the last ping or message. The
+    * {@linkplain CacheClientNotifier#scheduleClientPingTask ping task}
+    * increments it. Normal messages sent to the client reset it. If the counter
+    * reaches 3, a ping is sent.
+    */
+   private final AtomicInteger pingCounter = new AtomicInteger();
+   
+   
+   /** Date on which this instances was created */
+   private Date creationDate;
+ 
+   /** true when the durable client associated with this proxy is being 
+    * restarted and prevents cqs from being closed and drained**/
+   private boolean drainLocked = false;
+   private final Object drainLock = new Object();
+ 
+   /** number of cq drains that are currently in progress **/
+   private int numDrainsInProgress = 0;
+   private final Object drainsInProgressLock = new Object();
+   
+   /**
+    * Constructor.
+    *
+    * @param ccn
+    *          The <code>CacheClientNotifier</code> registering this proxy
+    * @param socket
+    *          The socket between the server and the client
+    * @param proxyID
+    *          representing the Connection Proxy of the clien
+    * @param isPrimary
+    *          The boolean stating whether this prozxy is primary
+    * @throws CacheException {
+    */
+   protected CacheClientProxy(CacheClientNotifier ccn, Socket socket,
+       ClientProxyMembershipID proxyID, boolean isPrimary, byte clientConflation, 
+       Version clientVersion, long acceptorId, boolean notifyBySubscription)
+       throws CacheException {
+     initializeTransientFields(socket, proxyID, isPrimary, clientConflation, clientVersion);
+     this._cacheClientNotifier = ccn;
+     this._cache = (GemFireCacheImpl)ccn.getCache();
+     this._maximumMessageCount = ccn.getMaximumMessageCount();    
+     this._messageTimeToLive = ccn.getMessageTimeToLive();
+     this._acceptorId = acceptorId;
+     this.notifyBySubscription = notifyBySubscription;
+     StatisticsFactory factory = this._cache.getDistributedSystem();
+     this._statistics = new CacheClientProxyStats(factory, 
+         "id_"+this.proxyID.getDistributedMember().getId()+ "_at_"+ this._remoteHostAddress + ":" + this._socket.getPort());
+ 
+     // Create the interest list
+     this.cils[RegisterInterestTracker.interestListIndex] =
+       new ClientInterestList(this, this.proxyID);
+     // Create the durable interest list
+     this.cils[RegisterInterestTracker.durableInterestListIndex] =
+       new ClientInterestList(this, this.getDurableId());
+     this.postAuthzCallback = null;
+     this._cacheClientNotifier.getAcceptorStats().incCurrentQueueConnections();
+     this.creationDate = new Date();
+     initializeClientAuths();
+   }
+   
+   private void initializeClientAuths()
+   {
+     if(AcceptorImpl.isPostAuthzCallbackPresent())
+       this.clientUserAuths = ServerConnection.getClientUserAuths(this.proxyID);
+   }
+ 
+   private void reinitializeClientAuths()
+   {
+     if (this.clientUserAuths != null && AcceptorImpl.isPostAuthzCallbackPresent()) {
+       synchronized (this.clientUserAuthsLock) {
+         ClientUserAuths newClientAuth = ServerConnection.getClientUserAuths(this.proxyID);
+         newClientAuth.fillPreviousCQAuth(this.clientUserAuths);
+         this.clientUserAuths = newClientAuth;
+       }        
+     }
+   }
+   
+   public void setPostAuthzCallback(AccessControl authzCallback) {
+     //TODO:hitesh synchronization
+     synchronized (this.clientUserAuthsLock) {
+       if (this.postAuthzCallback != null)
+         this.postAuthzCallback.close();
+       this.postAuthzCallback = authzCallback;
+     }
+   }
+   
+   public void setCQVsUserAuth(String cqName, long uniqueId, boolean isDurable)
+   {
+     if(postAuthzCallback == null) //only for multiuser
+     {
+       if(this.clientUserAuths != null)
+         this.clientUserAuths.setUserAuthAttributesForCq(cqName, uniqueId, isDurable);
+     }
+   }
+ 
+   private void initializeTransientFields(Socket socket,
+       ClientProxyMembershipID pid, boolean ip,  byte cc, Version vers) {
+     this._socket = socket;
+     this.proxyID = pid;
+     this.connected = true;
+     {
+       int bufSize = 1024;
+       try {
+         bufSize = _socket.getSendBufferSize();
+         if (bufSize < 1024) {
+           bufSize = 1024;
+         }
+       } catch (SocketException ignore) {
+       }
+       this._commBuffer = ServerConnection.allocateCommBuffer(bufSize, socket);
+     }
+     this._remoteHostAddress = socket.getInetAddress().getHostAddress();
+     this.isPrimary = ip;
+     this.clientConflation = cc;
+     this.clientVersion = vers;
+   }
+ 
+   public boolean isMarkerEnqueued() {
+     return markerEnqueued;
+   }
+ 
+   public void setMarkerEnqueued(boolean bool) {
+     markerEnqueued = bool;
+   }
+ 
+   public long getAcceptorId(){
+     return this._acceptorId;
+   }
+ 
+   /**
+    * @return the notifyBySubscription
+    */
+   public boolean isNotifyBySubscription() {
+     return this.notifyBySubscription;
+   }
+ 
+   
+   /**
+    * Returns the DistributedMember represented by this proxy
+    */
+   public ClientProxyMembershipID getProxyID()
+   {
+     return this.proxyID;
+   }
+   
+   // the following code was commented out simply because it was not used
+ //   /**
+ //    * Determines if the proxy represents the client host (and only the host, not
+ //    * necessarily the exact VM running on the host)
+ //    *
+ //    * @return Whether the proxy represents the client host
+ //    */
+ //   protected boolean representsClientHost(String clientHost)
+ //   {
+ //     // [bruce] TODO BUGBUGBUG: this should compare InetAddresses, not Strings
+ //     return this._remoteHostAddress.equals(clientHost);
+ //   }
+ 
+ //   protected boolean representsClientVM(DistributedMember remoteMember)
+ //   {
+ //     // logger.warn("Is input port " + clientPort + " contained in " +
+ //     // logger.warn("Does input host " + clientHost + " equal " +
+ //     // this._remoteHostAddress+ ": " + representsClientHost(clientHost));
+ //     // logger.warn("representsClientVM: " +
+ //     // (representsClientHost(clientHost) && containsPort(clientPort)));
+ //     return (proxyID.getDistributedMember().equals(remoteMember));
+ //   }
+ 
+ //   /**
+ //    * Determines if the CacheClientUpdater proxied by this instance is listening
+ //    * on the input clientHost and clientPort
+ //    *
+ //    * @param clientHost
+ //    *          The host name of the client to compare
+ //    * @param clientPort
+ //    *          The port number of the client to compare
+ //    *
+ //    * @return Whether the CacheClientUpdater proxied by this instance is
+ //    *         listening on the input clientHost and clientPort
+ //    */
+ //   protected boolean representsCacheClientUpdater(String clientHost,
+ //       int clientPort)
+ //   {
+ //     return (clientPort == this._socket.getPort() && representsClientHost(clientHost));
+ //   }
+ 
+   protected boolean isMember(ClientProxyMembershipID memberId)
+   {
+     return this.proxyID.equals(memberId);
+   }
+ 
+   protected boolean isSameDSMember(ClientProxyMembershipID memberId)
+   {
+     return this.proxyID.isSameDSMember(memberId);
+   }
+ 
+   /**
+    * Set the queue keepalive option
+    *
+    * @param option whether to keep the durable client's queue alive
+    */
+   protected void setKeepAlive(boolean option) {
+     this.keepalive = option;
+   }
+ 
+   /**
+    * Returns the socket between the server and the client
+    *
+    * @return the socket between the server and the client
+    */
+   protected Socket getSocket()
+   {
+     return this._socket;
+   }
+   
+   public String getSocketHost()
+   {
+     return this._socket.getInetAddress().getHostAddress();
+   }
+   
+   protected ByteBuffer getCommBuffer() {
+     return this._commBuffer;
+   }
+ 
+   /**
+    * Returns the remote host's IP address string
+    *
+    * @return the remote host's IP address string
+    */
+   protected String getRemoteHostAddress()
+   {
+     return this._remoteHostAddress;
+   }
+ 
+   /**
+    * Returns the remote host's port
+    *
+    * @return the remote host's port
+    */
+   public int getRemotePort()
+   {
+     return this._socket.getPort();
+   }
+ 
+   /**
+    * Returns whether the proxy is connected to a remote client
+    *
+    * @return whether the proxy is connected to a remote client
+    */
+   public boolean isConnected() {
+     return this.connected;
+   }
+ 
+   /**
+    * Mark the receiver as needing removal
+    * @return true if it was already marked for removal
+    */
+   protected boolean startRemoval() {
+     boolean result;
+     synchronized (this.isMarkedForRemovalLock) {
+       result = this.isMarkedForRemoval;
+       this.isMarkedForRemoval = true;
+     }
+     return result;
+   }
+   
+   /**
+    * Wait until the receiver's removal has completed before
+    * returning.
+    * @return true if the proxy was initially marked for removal
+    */
+   protected boolean waitRemoval() {
+     boolean result;
+     synchronized (this.isMarkedForRemovalLock) {
+       result = this.isMarkedForRemoval;
+       boolean interrupted = false;
+       try {
+         while (this.isMarkedForRemoval) {
+           if (logger.isDebugEnabled()) {
+             logger.debug("Waiting for CacheClientProxy removal: {}", this);
+           }
+           try {
+             this.isMarkedForRemovalLock.wait();
+           }
+           catch (InterruptedException e) {
+             interrupted = true;
+             this._cache.getCancelCriterion().checkCancelInProgress(e);
+           }
+         } // while
+       }
+       finally {
+         if (interrupted) {
+           Thread.currentThread().interrupt();
+         }
+       }
+     } // synchronized
+     return result;
+   }
+   
+   /**
+    * Indicate that removal has completed on this instance
+    */
+   protected void notifyRemoval() {
+     synchronized (this.isMarkedForRemovalLock) {
+       this.isMarkedForRemoval = false;
+       this.isMarkedForRemovalLock.notifyAll();
+     }
+   }
+   
+   /**
+    * Returns the GemFire cache
+    *
+    * @return the GemFire cache
+    */
+   public GemFireCacheImpl getCache()
+   {
+     return this._cache;
+   }
+ 
+   public Set<String> getInterestRegisteredRegions() {
+     HashSet<String> regions = new HashSet<String>();
+     for(int i=0; i < this.cils.length; i++){
+       if (!this.cils[i].regions.isEmpty()) {
+         regions.addAll(this.cils[i].regions);
+       }
+     }
+     return regions;
+   }
+   
+   /**
+    * Returns the proxy's statistics
+    *
+    * @return the proxy's statistics
+    */
+   public CacheClientProxyStats getStatistics()
+   {
+     return this._statistics;
+   }
+ 
+   /**
+    * Returns this proxy's <code>CacheClientNotifier</code>.
+    * @return this proxy's <code>CacheClientNotifier</code>
+    */
+   protected CacheClientNotifier getCacheClientNotifier() {
+     return this._cacheClientNotifier;
+   }
+ 
+   /**
+    * Returns the size of the queue for heuristic purposes.  This
+    * size may be changing concurrently if puts/gets are occurring
+    * at the same time.
+    */
+   public int getQueueSize() {
+     return this._messageDispatcher == null ? 0
+         : this._messageDispatcher.getQueueSize();
+   }
+   
+   /** 
+    * returns the queue size calculated through stats
+    */
+   public int getQueueSizeStat() {
+     return this._messageDispatcher == null ? 0
+         : this._messageDispatcher.getQueueSizeStat();  
+   }
+   
+       
+   public boolean drainInProgress() {
+     synchronized(drainsInProgressLock) {
+       return numDrainsInProgress > 0;
+     }
+   }
+   
+   //Called from CacheClientNotifier when attempting to restart paused proxy
+   //locking the drain lock requires that no drains are in progress 
+   //when the lock was acquired.
+   public boolean lockDrain() {
+     synchronized(drainsInProgressLock) {
+       if (!drainInProgress()) {
+         synchronized(drainLock) {
+           if (testHook != null) {
+             testHook.doTestHook("PRE_ACQUIRE_DRAIN_LOCK_UNDER_SYNC");
+           }
+           //prevent multiple lockings of drain lock
+           if (!drainLocked) {
+             drainLocked = true;
+             return true;
+           }
+         }
+       }
+     }
+     return false;
+   }
+   
+   //Called from CacheClientNotifier when completed restart of proxy
+   public void unlockDrain() {
+     if (testHook != null) {
+       testHook.doTestHook("PRE_RELEASE_DRAIN_LOCK");
+     }
+     synchronized(drainLock) {
+       drainLocked = false;
+     }
+   }
+   
+   //Only close the client cq if it is paused and no one is attempting to restart the proxy
+   public boolean closeClientCq(String clientCQName) throws CqException {
+     if (testHook != null) {
+       testHook.doTestHook("PRE_DRAIN_IN_PROGRESS");
+     }
+     synchronized(drainsInProgressLock) {
+       numDrainsInProgress ++;
+     }
+     if (testHook != null) {
+       testHook.doTestHook("DRAIN_IN_PROGRESS_BEFORE_DRAIN_LOCK_CHECK");
+     }
+     try {
+       //If the drain lock was acquired, the other thread did so before we could bump up
+       //the numDrainsInProgress.  That means we need to stop.
+       if (drainLocked) {
+         // someone is trying to restart a paused proxy
+         String msg = LocalizedStrings.CacheClientProxy_COULD_NOT_DRAIN_CQ_DUE_TO_RESTARTING_DURABLE_CLIENT.toLocalizedString(clientCQName, proxyID.getDurableId());
+         logger.info(msg);
+         throw new CqException(msg);
+       }
+       //isConnected is to protect against the case where a durable client has reconnected
+       //but has not yet sent a ready for events message
+       //we can probably remove the isPaused check
+       if (isPaused() && !isConnected()) {
+         CqService cqService = getCache().getCqService();
+         if (cqService != null) {
+           InternalCqQuery  cqToClose = cqService.getCq(cqService.constructServerCqName(
+               clientCQName, this.proxyID));
+           // close and drain
+           if (cqToClose != null) {
+             cqService.closeCq(clientCQName, this.proxyID);
+             this._messageDispatcher.drainClientCqEvents(this.proxyID, cqToClose);
+           }
+           else {
+             String msg = LocalizedStrings.CqService_CQ_NOT_FOUND_FAILED_TO_CLOSE_THE_SPECIFIED_CQ_0.toLocalizedString(clientCQName);
+             logger.info(msg);
+             throw new CqException(msg);
+           }
+         }
+       } else {
+         String msg = LocalizedStrings.CacheClientProxy_COULD_NOT_DRAIN_CQ_DUE_TO_ACTIVE_DURABLE_CLIENT.toLocalizedString(clientCQName, proxyID.getDurableId());
+         logger.info(msg);
+         throw new CqException(msg);
+       }
+     } finally {
+       synchronized (drainsInProgressLock) {
+         numDrainsInProgress--;
+       }
+       if (testHook != null) {
+         testHook.doTestHook("DRAIN_COMPLETE");
+       }
+ 
+     }
+     return true;
+   }
+   
+   
+   /**
+    * Returns whether the proxy is alive. It is alive if its message dispatcher
+    * is processing messages.
+    *
+    * @return whether the proxy is alive
+    */
+   protected boolean isAlive()
+   {
+     if (this._messageDispatcher == null) {
+       return false;
+     }
+     return !this._messageDispatcher.isStopped();
+   }
+ 
+   /**
+    * Returns whether the proxy is paused. It is paused if its message dispatcher
+    * is paused. This only applies to durable clients.
+    *
+    * @return whether the proxy is paused
+    *
+    * @since 5.5
+    */
+   protected boolean isPaused() {
+     return this._isPaused;
+   }
+ 
+   protected void setPaused(boolean isPaused) {
+     this._isPaused = isPaused;
+   }
+ 
+   /**
+    * Closes the proxy. This method checks the message queue for any unprocessed
+    * messages and processes them for MAXIMUM_SHUTDOWN_PEEKS.
+    *
+    * @see CacheClientProxy#MAXIMUM_SHUTDOWN_PEEKS
+    */
+   protected void close()
+   {
+     close(true, false);
+   }
+ 
+   /**
+    * Set to true once this proxy starts being closed.
+    * Remains true for the rest of its existence.
+    */
+   private final AtomicBoolean closing = new AtomicBoolean(false);
+ 
+   /**
+    * Close the <code>CacheClientProxy</code>.
+    *
+    * @param checkQueue
+    *          Whether to message check the queue and process any contained
+    *          messages (up to MAXIMUM_SHUTDOWN_PEEKS).
+    * @param stoppedNormally
+    *          Whether client stopped normally
+    *
+    * @return whether to keep this <code>CacheClientProxy</code>
+    * @see CacheClientProxy#MAXIMUM_SHUTDOWN_PEEKS
+    */
+   protected boolean close(boolean checkQueue, boolean stoppedNormally) {
+     boolean pauseDurable = false;
+     // If the client is durable and either (a) it hasn't stopped normally or (b) it
+     // has stopped normally but it is configured to be kept alive, set pauseDurable
+     // to true
+     if (isDurable()
+         && (!stoppedNormally || (getDurableKeepAlive() && stoppedNormally))) {
+       pauseDurable = true;
+     }
+ 
+     boolean keepProxy = false;
+     if (pauseDurable) {
+       pauseDispatching();
+       keepProxy = true;
+     } else {
+       terminateDispatching(checkQueue);
+       closeTransientFields();
+     }
+ 
+     this.connected = false;
+ 
+     // Close the Authorization callback (if any)
+     try {
+       if (!pauseDurable) {
+         if (this.postAuthzCallback != null) {//for single user
+           this.postAuthzCallback.close();
+           this.postAuthzCallback = null;
+         }else if(this.clientUserAuths != null) {//for multiple users
+           this.clientUserAuths.cleanup(true);
+           this.clientUserAuths = null;
+         }
+       }
+     }
+     catch (Exception ex) {
+       if (this._cache.getSecurityLoggerI18n().warningEnabled()) {
+         this._cache.getSecurityLoggerI18n().warning(LocalizedStrings.TWO_ARG_COLON, new Object[] {this, ex});
+       }
+     }
+     // Notify the caller whether to keep this proxy. If the proxy is durable
+     // and should be paused, then return true; otherwise return false.
+     return keepProxy;
+   }
+ 
+   protected void pauseDispatching() {
+     if (this._messageDispatcher == null){
+       return;
+     }
+ 
+     // If this is the primary, pause the dispatcher (which closes its transient
+     // fields. Otherwise, just close the transient fields.
+     if (logger.isDebugEnabled()) {
+       logger.debug("{}: Pausing processing", this);
+     }
+     //BUGFIX for BUG#38234
+     if(!testAndSetPaused(true) && this.isPrimary) {
+       if (this._messageDispatcher != Thread.currentThread()) {
+         // don't interrupt ourself to fix bug 40611
+         this._messageDispatcher.interrupt();
+       }
+     }
+ 
+     try {
+       // Close transient fields
+       closeTransientFields();
+     } finally {
+       // make sure this gets called if closeTransientFields throws; see bug 40611
+       // Start timer
+       scheduleDurableExpirationTask();
+     }
+   }
+ 
+   private boolean testAndSetPaused(boolean newValue) {
+     
+     synchronized(this._messageDispatcher._pausedLock) {
+       if (this._isPaused != newValue) {
+         this._isPaused = newValue;
+         this._messageDispatcher._pausedLock.notifyAll();
+         return !this._isPaused;
+       }
+       else {
+         this._messageDispatcher._pausedLock.notifyAll();
+         return this._isPaused;
+       }
+     }
+   }
+   protected void terminateDispatching(boolean checkQueue) {
+     if (this._messageDispatcher == null){
+       return;
+     }
+     
+     try {
+     if (logger.isDebugEnabled()) {
+       logger.debug("{}: Terminating processing", this);
+     }
+     if (this._messageDispatcher == Thread.currentThread()) {
+       // I'm not even sure this is possible but if the dispatcher
+       // calls us then at least call stopDispatching
+       // the old code did this (I'm not even sure it is safe to do).
+       // This needs to be done without testing OR setting "closing".
+       this._messageDispatcher.stopDispatching(checkQueue);
+       this.cils[RegisterInterestTracker.interestListIndex].clearClientInterestList();
+       this.cils[RegisterInterestTracker.durableInterestListIndex].clearClientInterestList();
+       // VJR: bug 37487 fix
+       destroyRQ();
+       return;
+     }
+ 
+     if (!this.closing.compareAndSet(false, true)) {
+       // must already be closing so just return
+       // this is part of the fix for 37684
+       return;
+     }
+     // Unregister interest in all interests (if necessary) 
+     this.cils[RegisterInterestTracker.interestListIndex].clearClientInterestList();
+     this.cils[RegisterInterestTracker.durableInterestListIndex].clearClientInterestList();
+ 
+     // If the message dispatcher is paused, unpause it. The next bit of
+     // code will interrupt the waiter.
+     if (this.testAndSetPaused(false)) {
+       if (logger.isDebugEnabled()) {
+         logger.debug("{}: Paused but terminating processing", this);
+       }
+       // Cancel the expiration task
+       cancelDurableExpirationTask(false);
+     }
+ 
+     boolean alreadyDestroyed = false;
+     boolean gotInterrupt = Thread.interrupted(); // clears the flag
+     try {
+       // Stop the message dispatcher
+       this._messageDispatcher.stopDispatching(checkQueue);
+ 
+       gotInterrupt |= Thread.interrupted(); // clears the flag
+ 
+       // to fix bug 37684
+       // 1. check to see if dispatcher is still alive
+       if (this._messageDispatcher.isAlive()) {
+         closeSocket();
+         destroyRQ();
+         alreadyDestroyed = true;
+         this._messageDispatcher.interrupt();
+         if (this._messageDispatcher.isAlive()) {
+           try {
+             this._messageDispatcher.join(1000);
+           } catch (InterruptedException ex) {
+             gotInterrupt = true;
+           }
+           // if it is still alive then warn and move on
+           if (this._messageDispatcher.isAlive()) {
+             //com.gemstone.gemfire.internal.OSProcess.printStacks(com.gemstone.gemfire.internal.OSProcess.getId());
+             logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0_COULD_NOT_STOP_MESSAGE_DISPATCHER_THREAD, this));
+           }
+         }
+       }
+     }
+     finally {
+       if (gotInterrupt) {
+         Thread.currentThread().interrupt();
+       }
+       if (!alreadyDestroyed) {
+         destroyRQ();
+       }
+     }
+     } finally {
+       //  Close the statistics
+       this._statistics.close(); // fix for bug 40105
+       closeTransientFields(); // make sure this happens
+     }
+   }
+ 
+   private void closeSocket() {
+     if (this._socketClosed.compareAndSet(false, true)) {
+       // Close the socket
+       this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, this._remoteHostAddress, null);
+       getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
+     }
+   }
+   
+   private void closeTransientFields() {
+     closeSocket();
+ 
+     // Null out comm buffer, host address, ports and proxy id. All will be
+     // replaced when the client reconnects.
+     releaseCommBuffer();
+     {
+       String remoteHostAddress = this._remoteHostAddress;
+       if (remoteHostAddress != null) {
+         this._cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress);
+         this._remoteHostAddress = null;
+       }
+     }
+     try {
+       this.cils[RegisterInterestTracker.interestListIndex].clearClientInterestList();
+     } catch (CacheClosedException e) {
+       // ignore if cache is shutting down
+     }
+     // Commented to fix bug 40259
+     //this.clientVersion = null;
+     closeNonDurableCqs();    
+   }
+   
+   private void releaseCommBuffer() {
+     ByteBuffer bb = this._commBuffer;
+     if (bb != null) {
+       this._commBuffer = null;
+       ServerConnection.releaseCommBuffer(bb);
+     }
+   }
+ 
+   private void closeNonDurableCqs(){
+     CqService cqService = getCache().getCqService();
+     if (cqService != null) {
+       try {
+         cqService.closeNonDurableClientCqs(getProxyID());
+       }
+       catch (CqException ex) {
+         logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_CQEXCEPTION_WHILE_CLOSING_NON_DURABLE_CQS_0, ex.getLocalizedMessage()));
+       }
+     }
+   }
+   
+   private void destroyRQ() {
+     if (this._messageDispatcher == null) {
+       return;
+     }
+     try {
+       // Using Destroy Region bcoz this method is modified in HARegion so as
+       // not to distribute.
+       // For normal Regions , even the localDestroyRegion actually propagates
+       HARegionQueue rq = this._messageDispatcher._messageQueue;
+       rq.destroy();
+ 
+       // if (!rq.getRegion().isDestroyed()) {
+       // rq.getRegion().destroyRegion();
+       // }
+     }
+     catch (RegionDestroyedException rde) {
+ //      throw rde;
+     }
+     catch (CancelException e) {
+ //      throw e;
+     }
+     catch (Exception warning) {
+       logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0_EXCEPTION_IN_CLOSING_THE_UNDERLYING_HAREGION_OF_THE_HAREGIONQUEUE, this), warning);
+     }
+   }
+ 
+   public void registerInterestRegex(String regionName, String regex,
+       boolean isDurable) {
+     registerInterestRegex(regionName, regex, isDurable, true);
+   }
+   
+   public void registerInterestRegex(String regionName, String regex,
+       boolean isDurable, boolean receiveValues) {
+     if (this.isPrimary) {
+       // Notify all secondaries and client of change in interest
+       notifySecondariesAndClient(regionName, regex, InterestResultPolicy.NONE,
+           isDurable, receiveValues, InterestType.REGULAR_EXPRESSION);
+     } else {
+       throw new IllegalStateException(LocalizedStrings.CacheClientProxy_NOT_PRIMARY.toLocalizedString());
+     }
+   }
+ 
+   public void registerInterest(String regionName, Object keyOfInterest,
+       InterestResultPolicy policy, boolean isDurable) {
+     registerInterest(regionName, keyOfInterest, policy, isDurable, true);
+   }
+   
+   public void registerInterest(String regionName, Object keyOfInterest,
+       InterestResultPolicy policy, boolean isDurable,
+       boolean receiveValues) {
+     if (keyOfInterest instanceof String && keyOfInterest.equals("ALL_KEYS")) {
+       registerInterestRegex(regionName, ".*", isDurable, receiveValues);
+     } else if (keyOfInterest instanceof List) {
+       if (this.isPrimary) {
+         notifySecondariesAndClient(regionName, keyOfInterest, policy,
+             isDurable, receiveValues, InterestType.KEY);
+       } else {
+         throw new IllegalStateException(LocalizedStrings.CacheClientProxy_NOT_PRIMARY.toLocalizedString());
+       }
+     } else {
+       if (this.isPrimary) {
+         // Notify all secondaries and client of change in interest
+         notifySecondariesAndClient(regionName, keyOfInterest, policy,
+             isDurable, receiveValues, InterestType.KEY);
+         
+         // Enqueue the initial value message for the client if necessary
+         if (policy == InterestResultPolicy.KEYS_VALUES) {
+           Get70 request = (Get70)Get70.getCommand();
+           LocalRegion lr = (LocalRegion) this._cache.getRegion(regionName);
+           Get70.Entry entry = request.getValueAndIsObject(lr, keyOfInterest, null,
+               null);
+           boolean isObject = entry.isObject;
+           byte[] value = null;
+           if (entry.value instanceof byte[]) {
+             value = (byte[])entry.value;
+           } else {
+             try {
+               value = CacheServerHelper.serialize(entry.value);
+             } catch (IOException e) {
+               logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_THE_FOLLOWING_EXCEPTION_OCCURRED_0, entry.value), e);
+             }
+           }
+           VersionTag tag = entry.versionTag;
+           ClientUpdateMessage updateMessage = new ClientUpdateMessageImpl(
+               EnumListenerEvent.AFTER_CREATE, lr, keyOfInterest, value, null,
+               (isObject ? (byte) 0x01 : (byte) 0x00), null, this.proxyID,
+               new EventID(this._cache.getDistributedSystem()), tag);
+           CacheClientNotifier.routeSingleClientMessage(updateMessage, this.proxyID);
+         }
+         // Add the client to the region's filters
+         //addFilterRegisteredClients(regionName, keyOfInterest);
+       } else {
+         throw new IllegalStateException(LocalizedStrings.CacheClientProxy_NOT_PRIMARY.toLocalizedString());
+       }
+     }
+   }
+ 
+   private void notifySecondariesAndClient(String regionName,
+       Object keyOfInterest, InterestResultPolicy policy, boolean isDurable,
+       boolean receiveValues, int interestType) {
+     // Create a client interest message for the keyOfInterest
+     ClientInterestMessageImpl message = new ClientInterestMessageImpl(
+         new EventID(this._cache.getDistributedSystem()), regionName,
+         keyOfInterest, interestType, policy.getOrdinal(), isDurable,
+         !receiveValues, ClientInterestMessageImpl.REGISTER);
+ 
+     // Notify all secondary proxies of a change in interest
+     notifySecondariesOfInterestChange(message);
+ 
+     // Modify interest registration
+     if (keyOfInterest instanceof List) {
+       registerClientInterestList(regionName, (List) keyOfInterest, isDurable,
+           !receiveValues, true);
+     } else {
+       registerClientInterest(regionName, keyOfInterest, interestType,
+           isDurable, !receiveValues, true);
+     }
+ 
+     // Enqueue the interest registration message for the client.
+     // If the client is not 7.0.1 or greater and the key of interest is a list,
+     // then create an individual message for each entry in the list since the
+     // client doesn't support a ClientInterestMessageImpl containing a list.
+     if (Version.GFE_701.compareTo(this.clientVersion) > 0
+         && keyOfInterest instanceof List) {
+       for (Iterator i = ((List) keyOfInterest).iterator(); i.hasNext();) {
+         this._messageDispatcher.enqueueMessage(new ClientInterestMessageImpl(
+             new EventID(this._cache.getDistributedSystem()), regionName,
+             i.next(), interestType, policy.getOrdinal(), isDurable, !receiveValues,
+             ClientInterestMessageImpl.REGISTER));
+      }
+     } else {
+       this._messageDispatcher.enqueueMessage(message);
+     }
+   }
+ 
+   public void unregisterInterestRegex(String regionName, String regex,
+       boolean isDurable) {
+     unregisterInterestRegex(regionName, regex, isDurable, true);
+   }
+   
+   public void unregisterInterestRegex(String regionName, String regex,
+       boolean isDurable, boolean receiveValues) {
+     if (this.isPrimary) {
+       notifySecondariesAndClient(regionName, regex, isDurable, receiveValues,
+           InterestType.REGULAR_EXPRESSION);
+     } else {
+       throw new IllegalStateException(LocalizedStrings.CacheClientProxy_NOT_PRIMARY.toLocalizedString());
+     }
+   }
+   
+   public void unregisterInterest(String regionName, Object keyOfInterest,
+       boolean isDurable) {
+     unregisterInterest(regionName, keyOfInterest, isDurable, true);
+   }
+   
+   public void unregisterInterest(String regionName, Object keyOfInterest,
+       boolean isDurable, boolean receiveValues) {
+     if (keyOfInterest instanceof String && keyOfInterest.equals("ALL_KEYS")) {
+       unregisterInterestRegex(regionName, ".*", isDurable, receiveValues);
+     } else {
+       if (this.isPrimary) {
+         notifySecondariesAndClient(regionName, keyOfInterest, isDurable,
+             receiveValues, InterestType.KEY);
+       } else {
+         throw new IllegalStateException(LocalizedStrings.CacheClientProxy_NOT_PRIMARY.toLocalizedString());
+       }
+     }
+   }
+ 
+   private void notifySecondariesAndClient(String regionName,
+       Object keyOfInterest, boolean isDurable, boolean receiveValues,
+       int interestType) {
+     // Notify all secondary proxies of a change in interest
+     ClientInterestMessageImpl message = new ClientInterestMessageImpl(
+         new EventID(this._cache.getDistributedSystem()), regionName,
+         keyOfInterest, interestType, (byte) 0, isDurable, !receiveValues,
+         ClientInterestMessageImpl.UNREGISTER);
+     notifySecondariesOfInterestChange(message);
+  
+     // Modify interest registration
+     if (keyOfInterest instanceof List) {
+       unregisterClientInterest(regionName, (List) keyOfInterest, false);
+     } else {
+       unregisterClientInterest(regionName, keyOfInterest, interestType,
+           false);
+     }
+  
+     // Enqueue the interest unregistration message for the client.
+     // If the client is not 7.0.1 or greater and the key of interest is a list,
+     // then create an individual message for each entry in the list since the
+     // client doesn't support a ClientInterestMessageImpl containing a list.
+     if (Version.GFE_701.compareTo(this.clientVersion) > 0
+         && keyOfInterest instanceof List) {
+       for (Iterator i = ((List) keyOfInterest).iterator(); i.hasNext();) {
+         this._messageDispatcher.enqueueMessage(new ClientInterestMessageImpl(
+             new EventID(this._cache.getDistributedSystem()), regionName,
+             i.next(), interestType, (byte) 0, isDurable, !receiveValues,
+             ClientInterestMessageImpl.UNREGISTER));
+       }
+     } else {
+       this._messageDispatcher.enqueueMessage(message);
+     }
+   }
+   
+   protected void notifySecondariesOfInterestChange(ClientInterestMessageImpl message) {
+     if (logger.isDebugEnabled()) {
+       StringBuffer subBuffer = new StringBuffer();
+       if (message.isRegister()) {
+         subBuffer
+           .append("register ")
+           .append(message.getIsDurable() ? "" : "non-")
+           .append("durable interest in ");
+       } else {
+         subBuffer.append("unregister interest in ");
+       }
+       StringBuffer buffer = new StringBuffer();
+       buffer
+         .append(this)
+         .append(": Notifying secondary proxies to ")
+         .append(subBuffer.toString())
+         .append(message.getRegionName())
+         .append("->")
+         .append(message.getKeyOfInterest())
+         .append("->")
+         .append(InterestType.getString(message.getInterestType()));
+       logger.debug(buffer.toString());
+     }
+     this._cacheClientNotifier.deliverInterestChange(this.proxyID, message);
+   }
+ 
+   /*
+   protected void addFilterRegisteredClients(String regionName,
+       Object keyOfInterest) {
+     try {
+       this._cacheClientNotifier.addFilterRegisteredClients(regionName,
+           this.proxyID);
+     } catch (RegionDestroyedException e) {
+       logger.warn(LocalizedStrings.CacheClientProxy_0_INTEREST_REG_FOR_0_FAILED, regionName + "->" + keyOfInterest, e);
+     }
+   }
+   */
+   
+   /**
+    * Registers interest in the input region name and key
+    *
+    * @param regionName
+    *          The fully-qualified name of the region in which to register
+    *          interest
+    * @param keyOfInterest
+    *          The key in which to register interest
+    */
+   protected void registerClientInterest(String regionName,
+       Object keyOfInterest, int interestType, boolean isDurable,
+       boolean sendUpdatesAsInvalidates, boolean flushState)
+   {
+     ClientInterestList cil =
+       this.cils[RegisterInterestTracker.getInterestLookupIndex(
+           isDurable, false)];
+     cil.registerClientInterest(regionName, keyOfInterest, interestType, sendUpdatesAsInvalidates);
+     if (flushState) {
+       flushForInterestRegistration(regionName, this._cache.getDistributedSystem().getDistributedMember());
+     }
+     HARegionQueue queue = getHARegionQueue();
+     if (queue != null) { // queue is null during initialization
+       queue.setHasRegisteredInterest(true);
+     }
+   }
+   
+   /**
+    * flush other regions to the given target.  This is usually the member
+    * that is registering the interest.  During queue creation it is the
+    * queue's image provider.
+    */
+   public void flushForInterestRegistration(String regionName, DistributedMember target) {
+     Region r = this._cache.getRegion(regionName);
+     if (r == null) {
+       if (logger.isDebugEnabled()) {
+         logger.debug("Unable to find region '{}' to flush for interest registration", regionName);
+       }
+     } else if (r.getAttributes().getScope().isDistributed()) {
+       if (logger.isDebugEnabled()){
+         logger.debug("Flushing region '{}' for interest registration", regionName);
+       }
+       CacheDistributionAdvisee cd = (CacheDistributionAdvisee)r;
+       final StateFlushOperation sfo;
+       if (r instanceof PartitionedRegion) {
+         // need to flush all buckets.  SFO should be changed to target buckets
+         // belonging to a particular PR, but it doesn't have that option right now
+         sfo = new StateFlushOperation(
+             this._cache.getDistributedSystem().getDistributionManager());
+       } else {
+         sfo = new StateFlushOperation((DistributedRegion)r);
+       }
+       try {
+         // bug 41681 - we need to flush any member that may have a cache operation
+         // in progress so that the changes are received there before returning
+         // from this method
+         InitialImageAdvice advice = cd.getCacheDistributionAdvisor().adviseInitialImage(null);
+         HashSet recips = new HashSet(advice.getReplicates());
+         recips.addAll(advice.getUninitialized());
+         recips.addAll(advice.getEmpties());
+         recips.addAll(advice.getPreloaded());
+         recips.addAll(advice.getOthers());
+         sfo.flush(recips,
+             target,
+             DistributionManager.HIGH_PRIORITY_EXECUTOR, true);
+       } catch (InterruptedException ie) {
+         Thread.currentThread().interrupt();
+         return;
+       }
+     }
+   }
+ 
+   /**
+    * Unregisters interest in the input region name and key
+    *
+    * @param regionName
+    *          The fully-qualified name of the region in which to unregister
+    *          interest
+    * @param keyOfInterest
+    *          The key in which to unregister interest
+    * @param isClosing
+    *          Whether the caller is closing
+    */
+   protected void unregisterClientInterest(String regionName,
+       Object keyOfInterest, int interestType, boolean isClosing)
+   {
+     // only unregister durable interest if isClosing and !keepalive
+     if (!isClosing /* explicit unregister */
+         || !getDurableKeepAlive() /* close and no keepAlive*/) {
+       this.cils[RegisterInterestTracker.durableInterestListIndex].
+         unregisterClientInterest(regionName, keyOfInterest, interestType);
+     }
+     // always unregister non durable interest
+     this.cils[RegisterInterestTracker.interestListIndex].
+       unregisterClientInterest(regionName, keyOfInterest, interestType);
+   }
+ 
+   /**
+    * Registers interest in the input region name and list of keys
+    *
+    * @param regionName
+    *          The fully-qualified name of the region in which to register
+    *          interest
+    * @param keysOfInterest
+    *          The list of keys in which to register interest
+    */
+   protected void registerClientInterestList(String regionName,
+       List keysOfInterest, boolean isDurable, boolean sendUpdatesAsInvalidates,
+       boolean flushState)
+   {
+     // we only use two interest lists to map the non-durable and durable
+     // identifiers to their interest settings
+     ClientInterestList cil =
+       this.cils[RegisterInterestTracker.getInterestLookupIndex(
+           isDurable, false/*sendUpdatesAsInvalidates*/)];
+     cil.registerClientInterestList(regionName, keysOfInterest, sendUpdatesAsInvalidates);
+     if (getHARegionQueue() != null) {
+       if (flushState) {
+         flushForInterestRegistration(regionName, this._cache.getDistributedSystem().getDistributedMember());
+       }
+       getHARegionQueue().setHasRegisteredInterest(true);
+     }
+   }
+ 
+   /**
+    * Unregisters interest in the input region name and list of keys
+    *
+    * @param regionName
+    *          The fully-qualified name of the region in which to unregister
+    *          interest
+    * @param keysOfInterest
+    *          The list of keys in which to unregister interest
+    * @param isClosing
+    *          Whether the caller is closing
+    */
+   protected void unregisterClientInterest(String regionName,
+       List keysOfInterest, boolean isClosing)
+   {
+     // only unregister durable interest if isClosing and !keepalive
+     if (!isClosing /* explicit unregister */
+         || !getDurableKeepAlive() /* close and no keepAlive*/) {
+       this.cils[RegisterInterestTracker.durableInterestListIndex].
+         unregisterClientInterestList(regionName, keysOfInterest);
+     }
+     // always unregister non durable interest
+     this.cils[RegisterInterestTracker.interestListIndex].
+       unregisterClientInterestList(regionName, keysOfInterest);
+   }
+ 
+ 
+   /** sent by the cache client notifier when there is an interest registration change */
+   protected void processInterestMessage(ClientInterestMessageImpl message) { 
+     int interestType = message.getInterestType(); 
+     String regionName = message.getRegionName(); 
+     Object key = message.getKeyOfInterest(); 
+     if (message.isRegister()) {
+       // Register interest in this region->key
+       if (key instanceof List) {
+         registerClientInterestList(regionName, (List) key,
+             message.getIsDurable(), message.getForUpdatesAsInvalidates(), true);
+       } else {
+         registerClientInterest(regionName, key, interestType,
+             message.getIsDurable(), message.getForUpdatesAsInvalidates(), true);
+       }
+        
+       // Add the client to the region's filters 
+       //addFilterRegisteredClients(regionName, key); 
+  
+       if (logger.isDebugEnabled()) { 
+         StringBuffer buffer = new StringBuffer();
+         buffer 
+           .append(this) 
+           .append(": Interest listener registered ") 
+           .append(message.getIsDurable() ? "" : "non-") 
+           .append("durable interest in ") 
+           .append(message.getRegionName()) 
+           .append("->") 
+           .append(message.getKeyOfInterest())
+           .append("->")
+           .append(InterestType.getString(message.getInterestType()));
+         logger.debug(buffer.toString()); 
+       } 
+     } else { 
+       // Unregister interest in this region->key 
+       if (key instanceof List) {
+         unregisterClientInterest(regionName, (List) key, false);
+       } else {
+         unregisterClientInterest(regionName, key, interestType, false);
+       }
+        
+       if (logger.isDebugEnabled()) { 
+         StringBuffer buffer = new StringBuffer(); 
+         buffer 
+           .append(this) 
+           .append(": Interest listener unregistered interest in ") 
+           .append(message.getRegionName()) 
+           .append("->") 
+           .append(message.getKeyOfInterest())
+           .append("->")
+           .append(InterestType.getString(message.getInterestType()));
+         logger.debug(buffer.toString()); 
+       } 
+     } 
+   } 
+ 
+   private boolean postDeliverAuthCheckPassed(ClientUpdateMessage clientMessage) {
+     // Before adding it in the queue for dispatching, check for post
+     // process authorization
+     if (AcceptorImpl.isAuthenticationRequired()
+         && this.postAuthzCallback == null
+         && AcceptorImpl.isPostAuthzCallbackPresent()) {
+       // security is on and callback is null: it means multiuser mode.
+       ClientUpdateMessageImpl cumi = (ClientUpdateMessageImpl)clientMessage;
+ 
+       CqNameToOp clientCq = cumi.getClientCq(this.proxyID);
+ 
+       if (clientCq != null && !clientCq.isEmpty()) {
+         if (logger.isDebugEnabled()) {
+           logger.debug("CCP clientCq size before processing auth {}", clientCq.size());
+         }
+         String[] regionNameHolder = new String[1];
+         OperationContext opctxt = getOperationContext(clientMessage,
+             regionNameHolder);
+         if (opctxt == null) {
+           logger.warn(LocalizedMessage.create(
+               LocalizedStrings.CacheClientProxy__0_NOT_ADDING_MESSAGE_TO_QUEUE_1_BECAUSE_THE_OPERATION_CONTEXT_OBJECT_COULD_NOT_BE_OBTAINED_FOR_THIS_CLIENT_MESSAGE,
+               new Object[] {this, clientMessage}));
+           return false;
+         }
+ 
+         String[] cqNames = clientCq.getNames();
+         if (logger.isDebugEnabled()) {
+           logger.debug("CCP clientCq names array size {}", cqNames.length);
+         }
+         for (int i = 0; i < cqNames.length; i++) {
+           try {
+             if (logger.isDebugEnabled()) {
+               logger.debug("CCP clientCq name {}", cqNames[i]);
+             }
+             boolean isAuthorized = false;
+ 
+             if (this.proxyID.isDurable() && this.getDurableKeepAlive()
+                 && this._isPaused) {
+               // need to take lock as we may be reinitializing proxy cache
+               synchronized (this.clientUserAuthsLock) {
+                 AuthorizeRequestPP postAuthCallback = this.clientUserAuths
+                     .getUserAuthAttributes(cqNames[i]).getPostAuthzRequest();
+                 if (logger.isDebugEnabled() && postAuthCallback == null) {
+                   logger.debug("CCP clientCq post callback is null");
+                 }
+                 if (postAuthCallback != null && postAuthCallback
+                     .getPostAuthzCallback().authorizeOperation(
+                         regionNameHolder[0], opctxt)) {
+                   isAuthorized = true;
+                 }
+               }
+             } else {
+               UserAuthAttributes userAuthAttributes = this.clientUserAuths
+                   .getUserAuthAttributes(cqNames[i]);
+ 
+               AuthorizeRequestPP postAuthCallback = userAuthAttributes
+                   .getPostAuthzRequest();
+               if (postAuthCallback == null && logger.isDebugEnabled()) {
+                 logger.debug("CCP clientCq post callback is null");
+               }
+               if (postAuthCallback != null && postAuthCallback
+                   .getPostAuthzCallback().authorizeOperation(
+                       regionNameHolder[0], opctxt)) {
+                 isAuthorized = true;
+               }
+             }
+ 
+             if (!isAuthorized) {
+               logger.warn(LocalizedMessage.create(
+                   LocalizedStrings.CacheClientProxy__0_NOT_ADDING_CQ_MESSAGE_TO_QUEUE_1_BECAUSE_AUTHORIZATION_FAILED,
+                   new Object[] {this, clientMessage}));
+               clientCq.delete(cqNames[i]);
+             }
+           } catch (Exception ex) {
+             // ignore...
+           }
+           if (logger.isDebugEnabled()) {
+             logger.debug("CCP clientCq size after processing auth {}", clientCq.size());
+           }
+         }
+         // again need to check as there may be no CQ available
+         if (!clientMessage.hasCqs(this.proxyID)) {
+           this._statistics.incMessagesNotQueuedNotInterested();
+           if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) {
+             logger.debug("{}: Not adding message to queue. It is not interested in this region and key: {}", clientMessage);
+           }
+           return false;
+         }
+       }
+     }
+     else if (this.postAuthzCallback != null) {
+       String[] regionNameHolder = new String[1];
+       boolean isAuthorize = false;
+       OperationContext opctxt = getOperationContext(clientMessage,
+           regionNameHolder);
+       if (opctxt == null) {
+         logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy__0_NOT_ADDING_MESSAGE_TO_QUEUE_1_BECAUSE_THE_OPERATION_CONTEXT_OBJECT_COULD_NOT_BE_OBTAINED_FOR_THIS_CLIENT_MESSAGE, new Object[] {this, clientMessage}));
+         return false;
+       }
+       if (logger.isTraceEnabled()){
+         logger.trace("{}: Invoking authorizeOperation for message: {}", this, clientMessage);
+       }
+ 
+       if (this.proxyID.isDurable() && this.getDurableKeepAlive()
+           && this._isPaused) {
+         synchronized (this.clientUserAuthsLock) {
+           isAuthorize = this.postAuthzCallback.authorizeOperation(
+               regionNameHolder[0], opctxt);
+         }
+       } else {
+         isAuthorize = this.postAuthzCallback.authorizeOperation(
+             regionNameHolder[0], opctxt);
+       }
+       if (!isAuthorize) {
+         logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy__0_NOT_ADDING_MESSAGE_TO_QUEUE_1_BECAUSE_AUTHORIZATION_FAILED, new Object[] {this, clientMessage}));
+         return false;
+       }
+     }
+ 
+     return true;
+   }
+ 
+   /**
+    * Delivers the message to the client representing this client proxy.
+    * @param conflatable 
+    */
+   protected void deliverMessage(Conflatable conflatable)
+   {
+     ClientUpdateMessage clientMessage = null;
+     if(conflatable instanceof HAEventWrapper) {
+       clientMessage = ((HAEventWrapper)conflatable).getClientUpdateMessage();
+     } else {
+       clientMessage = (ClientUpdateMessage)conflatable;
+     } 
+ 
+     this._statistics.incMessagesReceived();
+ 
+     if (clientMessage.needsNoAuthorizationCheck() || postDeliverAuthCheckPassed(clientMessage)) {
+       // If dispatcher is getting initialized, add the event to temporary queue.
+       if (this.messageDispatcherInit) {
+         synchronized (this.queuedEventsSync) {
+           if (this.messageDispatcherInit) {  // Check to see value did not changed while getting the synchronize lock.
+             if (logger.isDebugEnabled()) {
+               logger.debug("Message dispatcher for proxy {} is getting initialized. Adding message to the queuedEvents.", this);
+             }
+             this.queuedEvents.add(conflatable);
+             return;
+           }
+         }
+       }
+       
+       if (this._messageDispatcher != null) {
+         this._messageDispatcher.enqueueMessage(conflatable);
+       } else {
+         this._statistics.incMessagesFailedQueued();
+         if (logger.isDebugEnabled()) {
+           logger.debug("Message is not added to the queue. Message dispatcher for proxy: {} doesn't exist.", this);
+         }
+       }
+     } else {
+       this._statistics.incMessagesFailedQueued();
+     }
+   }
+ 
+   protected void sendMessageDirectly(ClientMessage message) {
+     // Send the message directly if the connection exists
+     // (do not go through the queue).
+     if (logger.isDebugEnabled()){
+       logger.debug("About to send message directly to {}", this);
+     }
+     if (this._messageDispatcher != null && this._socket != null && !this._socket.isClosed()) {
+       // If the socket is open, send the message to it
+       this._messageDispatcher.sendMessageDirectly(message);
+       if (logger.isDebugEnabled()){
+         logger.debug("Sent message directly to {}", this);
+       }
+     } else {
+       // Otherwise just reset the ping counter
+       resetPingCounter();
+       if (logger.isDebugEnabled()){
+         logger.debug("Skipped sending message directly to {}", this);
+       }
+     }
+   }
+   
+   private OperationContext getOperationContext(ClientMessage cmsg,
+       String[] regionNameHolder) {
+     ClientUpdateMessageImpl cmsgimpl = (ClientUpdateMessageImpl)cmsg;
+     OperationContext opctxt = null;
+     // TODO SW: Special handling for DynamicRegions; this should be reworked
+     // when DynamicRegion API is deprecated
+     String regionName = cmsgimpl.getRegionName();
+     regionNameHolder[0] = regionName;
+     if (cmsgimpl.isCreate()) {
+       if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) {
+         regionNameHolder[0] = (String)cmsgimpl.getKeyOfInterest();
+         opctxt = new RegionCreateOperationContext(true);
+       }
+       else {
+         PutOperationContext tmp = new PutOperationContext(cmsgimpl.getKeyOfInterest(), cmsgimpl
+             .getValue(), cmsgimpl.valueIsObject(), PutOperationContext.CREATE,
+             true);
+         tmp.setCallbackArg(cmsgimpl.getCallbackArgument());
+         opctxt = tmp;
+       }
+     }
+     else if (cmsgimpl.isUpdate()) {
+       if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) {
+         regionNameHolder[0] = (String)cmsgimpl.getKeyOfInterest();
+         opctxt = new RegionCreateOperationContext(true);
+       }
+       else {
+         PutOperationContext tmp = new PutOperationContext(cmsgimpl.getKeyOfInterest(), cmsgimpl
+             .getValue(), cmsgimpl.valueIsObject(), PutOperationContext.UPDATE,
+             true);
+         tmp.setCallbackArg(cmsgimpl.getCallbackArgument());
+         opctxt = tmp;
+       }
+     }
+     else if (cmsgimpl.isDestroy()) {
+       if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) {
+         regionNameHolder[0] = (String)cmsgimpl.getKeyOfInterest();
+         opctxt = new RegionDestroyOperationContext(true);
+       }
+       else {
+         DestroyOperationContext tmp = new DestroyOperationContext(cmsgimpl.getKeyOfInterest(), true);
+         tmp.setCallbackArg(cmsgimpl.getCallbackArgument());
+         opctxt = tmp;
+       }
+     }
+     else if (cmsgimpl.isDestroyRegion()) {
+       opctxt = new RegionDestroyOperationContext(true);
+     }
+     else if (cmsgimpl.isInvalidate()) {
+       InvalidateOperationContext tmp = new InvalidateOperationContext(cmsgimpl.getKeyOfInterest(), true);
+       tmp.setCallbackArg(cmsgimpl.getCallbackArgument());
+       opctxt = tmp;
+     }
+     else if (cmsgimpl.isClearRegion()) {
+       RegionClearOperationContext tmp = new RegionClearOperationContext(true);
+       tmp.setCallbackArg(cmsgimpl.getCallbackArgument());
+       opctxt = tmp;
+     }
+     return opctxt;
+   }
+ 
+   /**
+    * Initializes the message dispatcher thread. The
+    * <code>MessageDispatcher</code> processes the message queue.
+    *
+    * @throws CacheException
+    */
+   public void initializeMessageDispatcher() throws CacheException
+   {
+     this.messageDispatcherInit = true; // Initialization process.
+     try {
+       if (logger.isDebugEnabled()) {
+         logger.debug("{}: Initializing message dispatcher with capacity of {} entries", this, _maximumMessageCount);
+       }
+       String name = "Client Message Dispatcher for "
+         + getProxyID().getDistributedMember() + (isDurable()? " (" + getDurableId()+")" : "");
+       this._messageDispatcher = new MessageDispatcher(this, name);
+       
+       //Fix for 41375 - drain as many of the queued events
+       //as we can without synchronization.
+       if (logger.isDebugEnabled()) {
+         logger.debug("{} draining {} events from init queue into intialized queue", this, this.queuedEvents.size());
+       }
+       Conflatable nextEvent;
+       while((nextEvent = queuedEvents.poll()) != null) {
+         this._messageDispatcher.enqueueMessage(nextEvent);
+       }
+       
+       //Now finish emptying the queue with synchronization to make
+       //sure we don't miss any events.
+       synchronized (this.queuedEventsSync){
+           while((nextEvent = queuedEvents.poll()) != null) {
+             this._messageDispatcher.enqueueMessage(nextEvent);
+           }
+           
+           this.messageDispatcherInit = false; // Done initialization.
+       }
+     } finally {
+       if (this.messageDispatcherInit) { // If its not successfully completed.
+         this._statistics.close();
+       }
+     }
+   }
+ 
+   protected void startOrResumeMessageDispatcher(boolean processedMarker) {
+     // Only start or resume the dispatcher if it is Primary
+     if (this.isPrimary) {
+       // Add the marker to the queue
+       if (!processedMarker) {
+         EventID eventId = new EventID(this._cache.getDistributedSystem());
+         this._messageDispatcher.enqueueMarker(new ClientMarkerMessageImpl(eventId));
+       }
+ 
+       // Set the message queue to primary.
+       this._messageDispatcher._messageQueue.setPrimary(true);
+ 
+       // Start or resume the dispatcher
+       synchronized (this._messageDispatcher._pausedLock) {
+         if (this.isPaused()) {
+           // It is paused, resume it
+           this.setPaused(false);
+           if (this._messageDispatcher.isStopped()) {
+             if (logger.isDebugEnabled()) {
+               logger.debug("{}: Starting dispatcher", this);
+             }
+             this._messageDispatcher.start();
+           }
+           else {
+             // ARB: Initialize transient fields.
+             this._messageDispatcher.initializeTransients();
+             if (logger.isDebugEnabled()) {
+               logger.debug("{}: Resuming dispatcher", this);
+             }
+             this._messageDispatcher.resumeDispatching();
+           }
+         } else if (!this._messageDispatcher.isAlive()) {
+           if (logger.isDebugEnabled()) {
+             logger.debug("{}: Starting dispatcher", this);
+           }
+           this._messageDispatcher.start();
+         }
+       }
+     }
+   }
+ 
+   /*
+    * Returns whether the client represented by this <code> CacheClientProxy
+    * </code> has registered interest in anything. @return whether the client
+    * represented by this <code> CacheClientProxy </code> has registered interest
+    * in anything
+    */
+   protected boolean hasRegisteredInterested()
+   {
+     return
+     this.cils[RegisterInterestTracker.interestListIndex].hasInterest() ||
+     this.cils[RegisterInterestTracker.durableInterestListIndex].hasInterest();
+   }
+ 
+   /**
+    * Returns a string representation of the proxy
+    */
+   @Override
+   public String toString()
+   {
+     StringBuffer buffer = new StringBuffer();
+     buffer.append("CacheClientProxy[")
+       // .append("client proxy id=")
+       .append(this.proxyID)
+       // .append("; client host name=")
+       // .append(this._socket.getInetAddress().getCanonicalHostName())
+       // .append("; client host address=")
+       // .append(this._remoteHostAddress)
+       .append("; port=").append(this._socket.getPort())
+       .append("; primary=").append(isPrimary)
+       .append("; version=").append(clientVersion)
+       .append("]");
+     return buffer.toString();
+   }
+   
+   public String getState(){
+     StringBuffer buffer = new StringBuffer();
+     buffer.append("CacheClientProxy[")
+       // .append("client proxy id=")
+       .append(this.proxyID)
+       // .append("; client host name=")
+       // .append(this._socket.getInetAddress().getCanonicalHostName())
+       // .append("; client host address=")
+       // .append(this._remoteHostAddress)
+       .append("; port=").append(this._socket.getPort())
+       .append("; primary=").append(isPrimary)
+       .append("; version=").append(clientVersion)
+       .append("; paused=").append(isPaused())
+       .append("; alive=").append(isAlive())
+       .append("; connected=").append(isConnected())
+       .append("; isMarkedForRemoval=").append(isMarkedForRemoval)
+       .append("]");
+     return buffer.toString();
+   }
+ 
+   public boolean isPrimary()
+   {
+     //boolean primary = this._messageDispatcher.isAlive()
+     //    || this._messageDispatcher._messageQueue.isPrimary();
+     boolean primary = this.isPrimary;
+     //System.out.println(this + ": DISPATCHER IS ALIVE: " + this._messageDispatcher.isAlive());
+     //System.out.println(this + ": DISPATCHER QUEUE IS PRIMARY: " + this._messageDispatcher._messageQueue.isPrimary());
+     //System.out.println(this + ": IS PRIMARY: " + primary);
+     return primary;
+     // return this.isPrimary ;
+   }
+ 
+   protected boolean basicIsPrimary() {
+     return this.isPrimary;
+   }
+ 
+   protected void setPrimary(boolean isPrimary) {
+     this.isPrimary = isPrimary;
+   }
+ 
+   // private static int nextId = 0;
+   // static protected int getNextId() {
+   // synchronized (CacheClientProxy.class) {
+   // return ++nextId;
+   // }
+   // }
+   /*
+    * Return this client's HA region queue
+    * @returns - HARegionQueue of the client
+    */
+    public HARegionQueue getHARegionQueue() {
+      if (this._messageDispatcher != null){
+        return _messageDispatcher._messageQueue;
+      }
+      return null;
+    }
+ 
+ 
+   /**
+    * Reinitialize a durable <code>CacheClientProxy</code> with a new client.
+    * @param socket
+    *          The socket between the server and the client
+    * @param ip
+    *          whether this proxy represents the primary
+    */
+   protected void reinitialize(Socket socket, ClientProxyMembershipID proxyId,
+       Cache cache, boolean ip, byte cc, Version ver) {
+     // Re-initialize transient fields
+     initializeTransientFields(socket, proxyId, ip, cc, ver);
+     getCacheClientNotifier().getAcceptorStats().incCurrentQueueConnections();
+ 
+ 
+     // Cancel expiration task
+     cancelDurableExpirationTask(true);
+ 
+     // Set the message dispatcher's primary flag. This could go from primary
+     // to secondary
+     this._messageDispatcher._messageQueue.setPrimary(ip);
+     this._messageDispatcher._messageQueue.setClientConflation(cc);
+ 
+     reinitializeClientAuths();
+     this.creationDate = new Date();
+     if (logger.isDebugEnabled()) {
+       logger.debug("{}: Has been reinitialized", this);
+     }
+   }
+ 
+   protected boolean isDurable() {
+     return getProxyID().isDurable();
+   }
+ 
+   protected String getDurableId() {
+     return getProxyID().getDurableId();
+   }
+ 
+   protected int getDurableTimeout() {
+     return getProxyID().getDurableTimeout();
+   }
+ 
+   private boolean getDurableKeepAlive() {
+     return this.keepalive;
+   }
+ 
+   protected String getHARegionName() {
+     return getProxyID().getHARegionName();
+   }
+ 
+   public Region getHARegion() {
+     return this._messageDispatcher._messageQueue.getRegion();
+   }
+   
+   public Version getVersion() {
+     return this.clientVersion;
+   }
+   
+   protected void scheduleDurableExpirationTask() {
+     SystemTimer.SystemTimerTask task = new SystemTimer.SystemTimerTask() {
+       @Override
+       public void run2() {
+         _durableExpirationTask.compareAndSet(this, null);
+         logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0__THE_EXPIRATION_TASK_HAS_FIRED_SO_THIS_PROXY_IS_BEING_TERMINATED, CacheClientProxy.this));
+         // Remove the proxy from the CacheClientNofier's registry
+         getCacheClientNotifier().removeClientProxy(CacheClientProxy.this);
+         getCacheClientNotifier().durableClientTimedOut(CacheClientProxy.this.proxyID);
+ 
+         // Close the proxy
+         terminateDispatching(false);
+         _cacheClientNotifier._statistics.incQueueDroppedCount();
+ 
+         /**
+          * Setting the expiration task to null again and cancelling existing
+          * one, if any. See #50894.
+          * <p/>
+          * The message dispatcher may again set the expiry task in below path:
+          * <code>
+          *  com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy.scheduleDurableExpirationTask(CacheClientProxy.java:2020)
+          *  com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy.pauseDispatching(CacheClientProxy.java:924)
+          *  com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy$MessageDispatcher.pauseOrUnregisterProxy(CacheClientProxy.java:2813)
+          *  com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy$MessageDispatcher.run(CacheClientProxy.java:2692)
+          * </code>
+          * <p/>
+          * This is because message dispatcher may get an IOException with
+          * "Proxy closing due to socket being closed locally" during/after
+          * terminateDispatching(false) above.
+          */
+         Object task = _durableExpirationTask.getAndSet(null);
+         if (task != null) {
+           ((SystemTimerTask)task).cancel();
+         }
+       }
+ 
+     };
+     if(this._durableExpirationTask.compareAndSet(null, task)) {
+       _cache.getCCPTimer().schedule(task,
+           getDurableTimeout()*1000L);
+     }
+   }
+ 
+   protected void cancelDurableExpirationTask(boolean logMessage) {
+     SystemTimer.SystemTimerTask task = (SystemTimerTask) _durableExpirationTask.getAndSet(null);
+     if (task != null) {
+       if (logMessage) {
+         logger.info(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0_CANCELLING_EXPIRATION_TASK_SINCE_THE_CLIENT_HAS_RECONNECTED, this));
+       }
+       task.cancel();
+     }
+   }
+ 
+   /**
+    * Class <code>ClientInterestList</code> provides a convenient interface
+    * for manipulating client interest information.
+    */
+   static protected class ClientInterestList
+    {
+     
+     final CacheClientProxy ccp;
+     
+     final Object id;
+ 
+      /**
+      * An object used for synchronizing the interest lists
+      */
+     final private Object interestListLock = new Object();
+     
+     /**
+      * Regions that this client is interested in
+      */
+     final protected Set<String> regions = new HashSet<String>();
+ 
+     /**
+      * Constructor.
+      */
+     protected ClientInterestList(CacheClientProxy ccp, Object interestID) {
+       this.ccp = ccp;
+       this.id = interestID;
+       // this.id = getNextId();
+     }
+ 
+     /**
+      * Registers interest in the input region name and key
+      */
+     protected void registerClientInterest(String regionName,
+         Object keyOfInterest, int interestType, boolean sendUpdatesAsInvalidates)
+     {
+       if (logger.isDebugEnabled()) {
+         logger.debug("{}: registerClientInterest region={} key={}", ccp, regionName, keyOfInterest);
+       }
+       Set keysRegistered = null;
+       synchronized(this.interestListLock) {
+         LocalRegion r = (LocalRegion)this.ccp._cache.getRegion(regionName, true);
+         if (r == null) {
+           throw new RegionDestroyedException("Region could not be found for interest registration", regionName);
+         }
+         if ( ! (r instanceof CacheDistributionAdvisee) ) {
+           throw new IllegalArgumentException("region " + regionName + " is not distributed and does not support interest registration");
+         }
+         FilterProfile p = r.getFilterProfile();
+         keysRegistered = p.registerClientInterest(id, keyOfInterest, interestType, sendUpdatesAsInvalidates);
+         regions.add(regionName);
+       }
+       // Perform actions if any keys were registered  
+       if ((keysRegistered != null) && containsInterestRegistrationListeners()
+           && !keysRegistered.isEmpty()) {
+         handleInterestEvent(regionName, keysRegistered, interestType, true);
+       } 
+     }
+     
+     
+     protected FilterProfile getProfile(String regionName) {
+       try {
+         return this.ccp._cache.getFilterProfile(regionName);
+       } catch (CacheClosedException e) {
+         return null;
+       }
+     }
+ 
+     /**
+      * Unregisters interest in the input region name and key
+      *
+      * @param regionName
+      *          The fully-qualified name of the region in which to unregister
+      *          interest
+      * @param keyOfInterest
+      *          The key in which to unregister interest
+      */
+     protected void unregisterClientInterest(String regionName,
+         Object keyOfInterest, int interestType)
+     {
+       if (logger.isDebugEnabled()) {
+         logger.debug("{}: unregisterClientInterest region={} key={}", ccp, regionName, keyOfInterest);
+       }
+       FilterProfile p = getProfile(regionName);
+       Set keysUnregistered = null;
+       synchronized(this.interestListLock) {
+         if (p != null) {
+           keysUnregistered = p.unregisterClientInterest(
+             id, keyOfInterest, interestType);
+           if (!p.hasInterestFor(id)) {
+             this.regions.remove(regionName);
+           }
+         } else {
+           this.regions.remove(regionName);
+         }
+       }
+       if (keysUnregistered != null && !keysUnregistered.isEmpty()) { 
+         handleInterestEvent(regionName, keysUnregistered, interestType, false); 
+       } 
+     }
+ 
+     /**
+      * Registers interest in the input region name and list of keys
+      *
+      * @param regionName
+      *          The fully-qualified name of the region in which to register
+      *          interest
+      * @param keysOfInterest
+      *          The list of keys in which to register interest
+      */
+     protected void registerClientInterestList(String regionName,
+         List keysOfInterest, boolean sendUpdatesAsInvalidates) {
+       FilterProfile p = getProfile(regionName);
+       if (p == null) {
+         throw new RegionDestroyedException("Region not found during client interest registration", regionName);
+       }
+       Set keysRegistered = null;
+       synchronized(this.interestListLock) {
+         keysRegistered = p.registerClientInterestList(id, keysOfInterest, sendUpdatesAsInvalidates);
+         regions.add(regionName);
+       }
+       // Perform actions if any keys were registered 
+       if (containsInterestRegistrationListeners() && !keysRegistered.isEmpty()) {
+         handleInterestEvent(regionName, keysRegistered, InterestType.KEY, true);
+       } 
+     }
+ 
+     /**
+      * Unregisters interest in the input region name and list of keys
+      *
+      * @param regionName
+      *          The fully-qualified name of the region in which to unregister
+      *          interest
+      * @param keysOfInterest
+      *          The list of keys in which to unregister interest
+      */
+     protected void unregisterClientInterestList(String regionName,
+         List keysOfInterest)
+     {
+       FilterProfile p = getProfile(regionName);
+       Set keysUnregistered = null;
+       synchronized(this.interestListLock) {
+         if (p != null) {
+           keysUnregistered = p.unregisterClientInterestList(
+               id, keysOfInterest);
+           if (!p.hasInterestFor(id)) {
+             regions.remove(regionName);
+           }
+         } else {
+           regions.remove(regionName);
+         }
+       }
+      // Perform actions if any keys were unregistered
+       if (!keysUnregistered.isEmpty()) { 
+         handleInterestEvent(regionName, keysUnregistered, InterestType.KEY,false); 
+       } 
+     }
+ 
+     /*
+      * Returns whether this interest list has any keys, patterns or filters of
+      * interest. It answers the question: Are any clients being notified because
+      * of this interest list? @return whether this interest list has any keys,
+      * patterns or filters of interest
+      */
+     protected boolean hasInterest() {
+       return regions.size() > 0;
+     }
+ 
+     protected void clearClientInterestList() {
+       boolean isClosed = ccp.getCache().isClosed();
+       
+       synchronized(this.interestListLock) {
+         for (String regionName: regions) {
+           FilterProfile p = getProfile(regionName);
+           if (p == null) {
+             continue;
+           }
+           if (!isClosed) {
+             if (p.hasAllKeysInterestFor(id)) {
+               Set allKeys = new HashSet();
+               allKeys.add(".*");
+               allKeys = Collections.unmodifiableSet(allKeys);
+               handleInterestEvent(regionName, allKeys,
+                   InterestType.REGULAR_EXPRESSION, false);
+             }
+             Set keysOfInterest = p.getKeysOfInterestFor(id);
+             if (keysOfInterest != null && keysOfInterest.size() > 0) {
+               handleInterestEvent(regionName, keysOfInterest,
+                   InterestType.KEY, false);
+             }
+             Map<String,Pattern> patternsOfInterest =
+               p.getPatternsOfInterestFor(id);
+             if (patternsOfInterest != null && patternsOfInterest.size() > 0) {
+               handleInterestEvent(regionName, patternsOfInterest.keySet(),
+                   InterestType.REGULAR_EXPRESSION, false);
+             }
+           }
+           p.clearInterestFor(id);
+         }
+         regions.clear();
+       }
+     }
+ 
+ 
+     private void handleInterestEvent(String regionName, Set keysOfInterest,
+         int interestType, boolean isRegister) {
+       // Notify the region about this register interest event if:
+       // - the application has requested it
+       // - this is a primary CacheClientProxy (otherwise multiple notifications
+       // may occur)
+       // - it is a key interest type (regex is currently not supported)
+       InterestRegistrationEvent event = null;
+       if (NOTIFY_REGION_ON_INTEREST && this.ccp.isPrimary()
+           && interestType == InterestType.KEY) {
+         event = new InterestRegistrationEventImpl(this.ccp, regionName,
+             keysOfInterest, interestType, isRegister);
+         try {
+           notifyRegionOfInterest(event);
+         }
+         catch (Exception e) {
+           logger.warn(LocalizedStrings.CacheClientProxy_REGION_NOTIFICATION_OF_INTEREST_FAILED, e);
+         }
+       }
+       // Invoke interest registration listeners
+       if (containsInterestRegistrationListeners()) {
+         if (event == null) {
+           event = new InterestRegistrationEventImpl(this.ccp, regionName,
+               keysOfInterest, interestType, isRegister);
+         }
+         notifyInterestRegistrationListeners(event);
+       }      
+     }
+ 
+     private void notifyRegionOfInterest(InterestRegistrationEvent event) {
+       this.ccp.getCacheClientNotifier().handleInterestEvent(event);
+     }
+ 
+     private void notifyInterestRegistrationListeners(
+         InterestRegistrationEvent event) {
+       this.ccp.getCacheClientNotifier().notifyInterestRegistrationListeners(
+           event);
+     }
+ 
+     private boolean containsInterestRegistrationListeners() {
+       return this.ccp.getCacheClientNotifier()
+           .containsInterestRegistrationListeners();
+     }
+   }
+ 
+ 
+   /**
+    * Class <code>MessageDispatcher</code> is a <code>Thread</code> that
+    * processes messages bound for the client by taking messsages from the
+    * message queue and sending them to the client over the socket.
+    */
+   static class MessageDispatcher extends Thread
+    {
+ 
+     /**
+      * The queue of messages to be sent to the client
+      */
+     protected final HARegionQueue _messageQueue;
+ 
+ //    /**
+ //     * An int used to keep track of the number of messages dropped for logging
+ //     * purposes. If greater than zero then a warning has been logged about
+ //     * messages being dropped.
+ //     */
+ //    private int _numberOfMessagesDropped = 0;
+ 
+     /**
+      * The proxy for which this dispatcher is processing messages
+      */
+     private final CacheClientProxy _proxy;
+ 
+ //    /**
+ //     * The conflator faciliates message conflation
+ //     */
+ //     protected BridgeEventConflator _eventConflator;
+ 
+     /**
+      * Whether the dispatcher is stopped
+      */
+     private volatile boolean _isStopped = true;
+ 
+     /**
+      * @guarded.By _pausedLock
+      */
+     //boolean _isPausedDispatcher = false;
+     
+     /**
+      * A lock object used to control pausing this dispatcher
+      */
+     protected final Object _pausedLock = new Object();
+ 
+     /**
+      * An object used to protect when dispatching is being stopped.
+      */
+     private final Object _stopDispatchingLock = new Object();
+ 
+     private final ReadWriteLock socketLock = new ReentrantReadWriteLock();
+ 
+     private final Lock socketWriteLock = socketLock.writeLock();
+ //    /**
+ //     * A boolean verifying whether a warning has already been issued if the
+ //     * message queue has reached its capacity.
+ //     */
+ //    private boolean _messageQueueCapacityReachedWarning = false;
+ 
+     /**
+      * Constructor.
+      *
+      * @param proxy
+      *          The <code>CacheClientProxy</code> for which this dispatcher is
+      *          processing messages
+      * @param name thread name for this dispatcher
+      

<TRUNCATED>