You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/02/22 22:44:07 UTC
[079/100] [abbrv] incubator-geode git commit: GEODE-917: Merge branch
'feature/GEODE-917' into develop
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
index 0000000,69ae6d8..f650fee
mode 000000,100755..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
@@@ -1,0 -1,3112 +1,3116 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package com.gemstone.gemfire.internal.cache.tier.sockets;
+
+ import java.io.ByteArrayInputStream;
+ import java.io.DataInputStream;
+ import java.io.IOException;
+ import java.net.Socket;
+ import java.net.SocketException;
+ import java.nio.ByteBuffer;
+ import java.util.Arrays;
+ import java.util.Collections;
+ import java.util.Date;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.ConcurrentLinkedQueue;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReadWriteLock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
+ import java.util.regex.Pattern;
+
+ import org.apache.logging.log4j.Logger;
+
+ import com.gemstone.gemfire.CancelException;
+ import com.gemstone.gemfire.DataSerializer;
+ import com.gemstone.gemfire.StatisticsFactory;
+ import com.gemstone.gemfire.cache.Cache;
+ import com.gemstone.gemfire.cache.CacheClosedException;
+ import com.gemstone.gemfire.cache.CacheException;
+ import com.gemstone.gemfire.cache.ClientSession;
+ import com.gemstone.gemfire.cache.DynamicRegionFactory;
+ import com.gemstone.gemfire.cache.InterestRegistrationEvent;
+ import com.gemstone.gemfire.cache.InterestResultPolicy;
+ import com.gemstone.gemfire.cache.Region;
+ import com.gemstone.gemfire.cache.RegionDestroyedException;
+ import com.gemstone.gemfire.cache.RegionExistsException;
+ import com.gemstone.gemfire.cache.client.internal.RegisterInterestTracker;
+ import com.gemstone.gemfire.cache.operations.DestroyOperationContext;
+ import com.gemstone.gemfire.cache.operations.InvalidateOperationContext;
+ import com.gemstone.gemfire.cache.operations.OperationContext;
+ import com.gemstone.gemfire.cache.operations.PutOperationContext;
+ import com.gemstone.gemfire.cache.operations.RegionClearOperationContext;
+ import com.gemstone.gemfire.cache.operations.RegionCreateOperationContext;
+ import com.gemstone.gemfire.cache.operations.RegionDestroyOperationContext;
+ import com.gemstone.gemfire.cache.query.CqException;
+ import com.gemstone.gemfire.cache.query.CqQuery;
+ import com.gemstone.gemfire.cache.query.internal.cq.CqService;
+ import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
+ import com.gemstone.gemfire.distributed.DistributedMember;
+ import com.gemstone.gemfire.distributed.internal.DistributionManager;
+ import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+ import com.gemstone.gemfire.internal.SystemTimer;
+ import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
+ import com.gemstone.gemfire.internal.Version;
+ import com.gemstone.gemfire.internal.cache.ClientServerObserver;
+ import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
+ import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisee;
+ import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.InitialImageAdvice;
+ import com.gemstone.gemfire.internal.cache.Conflatable;
+ import com.gemstone.gemfire.internal.cache.DistributedRegion;
+ import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
+ import com.gemstone.gemfire.internal.cache.EventID;
+ import com.gemstone.gemfire.internal.cache.FilterProfile;
+ import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+ import com.gemstone.gemfire.internal.cache.InterestRegistrationEventImpl;
+ import com.gemstone.gemfire.internal.cache.LocalRegion;
+ import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+ import com.gemstone.gemfire.internal.cache.StateFlushOperation;
+ import com.gemstone.gemfire.internal.cache.ha.HAContainerWrapper;
+ import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
+ import com.gemstone.gemfire.internal.cache.ha.HARegionQueueAttributes;
+ import com.gemstone.gemfire.internal.cache.ha.HARegionQueueStats;
+ import com.gemstone.gemfire.internal.cache.tier.InterestType;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessageImpl.CqNameToOp;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.command.Get70;
+ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+ import com.gemstone.gemfire.internal.logging.LogService;
+ import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
+ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+ import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+ import com.gemstone.gemfire.internal.security.AuthorizeRequestPP;
+ import com.gemstone.gemfire.security.AccessControl;
+ import com.gemstone.gemfire.i18n.StringId;
+
+ /**
+ * Class <code>CacheClientProxy</code> represents the server side of the
+ * {@link CacheClientUpdater}. It queues messages to be sent from the server to
+ * the client. It then reads those messages from the queue and sends them to the
+ * client.
+ *
+ * @author Barry Oglesby
+ *
+ * @since 4.2
+ */
+ @SuppressWarnings("synthetic-access")
+ public class CacheClientProxy implements ClientSession {
+ private static final Logger logger = LogService.getLogger();
+
+ /**
+ * The socket between the server and the client
+ */
+ protected Socket _socket;
+
+ private final AtomicBoolean _socketClosed = new AtomicBoolean();
+
+ /**
+ * A communication buffer used by each message we send to the client
+ */
+ protected ByteBuffer _commBuffer;
+
+ /**
+ * The remote host's IP address string (cached for convenience)
+ */
+ protected String _remoteHostAddress;
+
+ /**
+ * Concurrency: protected by synchronization of {@link #isMarkedForRemovalLock}
+ */
+ protected boolean isMarkedForRemoval = false;
+
+ /**
+ * @see #isMarkedForRemoval
+ */
+ protected final Object isMarkedForRemovalLock = new Object();
+
+ /**
+ * The proxy id of the client represented by this proxy
+ */
+ protected ClientProxyMembershipID proxyID;
+
+ /**
+ * The GemFire cache
+ */
+ protected final GemFireCacheImpl _cache;
+
+ /**
+ * The list of keys that the client represented by this proxy is interested in
+ * (stored by region)
+ */
+ protected final ClientInterestList[] cils = new ClientInterestList[2];
+
+ /**
+ * A thread that dispatches messages to the client
+ */
+ protected volatile MessageDispatcher _messageDispatcher;
+
+ /**
+ * The statistics for this proxy
+ */
+ protected final CacheClientProxyStats _statistics;
+
+ protected final AtomicReference _durableExpirationTask = new AtomicReference();
+
+ protected SystemTimer durableTimer;
+
+ /**
+ * Whether this dispatcher is paused
+ */
+ protected volatile boolean _isPaused = true;
+
+ /**
+ * True if we are connected to a client.
+ */
+ private volatile boolean connected = false;
+ // /**
+ // * A string representing interest in all keys
+ // */
+ // protected static final String ALL_KEYS = "ALL_KEYS";
+ //
+ /**
+ * True if a marker message is still in the ha queue.
+ */
+ private boolean markerEnqueued = false;
+
+ /**
+ * The number of times to peek on shutdown before giving up and shutting down
+ */
+ protected static final int MAXIMUM_SHUTDOWN_PEEKS = Integer.getInteger("gemfire.MAXIMUM_SHUTDOWN_PEEKS",50).intValue();
+
+ /**
+ * The number of milliseconds to wait for an offering to the message queue
+ */
+ protected static final int MESSAGE_OFFER_TIME = 0;
+
+ /**
+ * The default maximum message queue size
+ */
+ // protected static final int MESSAGE_QUEUE_SIZE_DEFAULT = 230000;
+
+ /** The message queue size */
+ protected final int _maximumMessageCount;
+
+ /**
+ * The time (in seconds ) after which a message in the client queue will
+ * expire.
+ */
+ protected final int _messageTimeToLive;
+
+ /**
+ * The <code>CacheClientNotifier</code> registering this proxy.
+ */
+ protected final CacheClientNotifier _cacheClientNotifier;
+
+ /**
+ * Defaults to true; meaning do some logging of dropped client notification
+ * messages. Set the system property to true to cause dropped messages to NOT
+ * be logged.
+ */
+ protected static final boolean LOG_DROPPED_MSGS = !Boolean
+ .getBoolean("gemfire.disableNotificationWarnings");
+
+ /**
+ * for testing purposes, delays the start of the dispatcher thread
+ */
+ public static boolean isSlowStartForTesting = false;
+
+ /**
+ * Default value for slow starting time of dispatcher
+ */
+ private static final long DEFAULT_SLOW_STARTING_TIME = 5000;
+
+ /**
+ * Key in the system property from which the slow starting time value will be
+ * retrieved
+ */
+ private static final String KEY_SLOW_START_TIME_FOR_TESTING = "slowStartTimeForTesting";
+
+ private boolean isPrimary;
+
+ /** @since 5.7 */
+ protected byte clientConflation = HandShake.CONFLATION_DEFAULT;
+
+ /**
+ * Flag to indicate whether to keep a durable client's queue alive
+ */
+ boolean keepalive = false;
+
+ private AccessControl postAuthzCallback;
+
+ /**
+ * For multiuser environment..
+ */
+ private ClientUserAuths clientUserAuths;
+
+ private final Object clientUserAuthsLock = new Object();
+
+ /**
+ * The version of the client
+ */
+ private Version clientVersion;
+
+ /**
+ * A map of region name as key and integer as its value. Basically, it stores
+ * the names of the regions with <code>DataPolicy</code> as EMPTY. If an
+ * event's region name is present in this map, it's full value (and not
+ * delta) is sent to the client represented by this proxy.
+ *
+ * @since 6.1
+ */
+ private volatile Map regionsWithEmptyDataPolicy = new HashMap();
+
+ /**
+ * A debug flag used for testing Backward compatibility
+ */
+ public static boolean AFTER_MESSAGE_CREATION_FLAG = false;
+
+ /**
+ * Notify the region when a client interest registration occurs. This tells
+ * the region to update access time when an update is to be pushed to a
+ * client. It is enabled only for <code>PartitionedRegion</code>s
+ * currently.
+ */
+ protected static final boolean NOTIFY_REGION_ON_INTEREST = Boolean
+ .getBoolean("gemfire.updateAccessTimeOnClientInterest");
+
+ /**
+ * The AcceptorImpl identifier to which the proxy is connected.
+ */
+ private final long _acceptorId;
+
+ /** acceptor's setting for notifyBySubscription */
+ private final boolean notifyBySubscription;
+
+ /** To queue the events arriving during message dispatcher initialization */
+ private volatile ConcurrentLinkedQueue<Conflatable> queuedEvents = new ConcurrentLinkedQueue<Conflatable>();
+
+ private final Object queuedEventsSync = new Object();
+
+ private volatile boolean messageDispatcherInit = false;
+
+ /**
+ * A counter that keeps track of how many task iterations that have occurred
+ * since the last ping or message. The
+ * {@linkplain CacheClientNotifier#scheduleClientPingTask ping task}
+ * increments it. Normal messages sent to the client reset it. If the counter
+ * reaches 3, a ping is sent.
+ */
+ private final AtomicInteger pingCounter = new AtomicInteger();
+
+
+ /** Date on which this instances was created */
+ private Date creationDate;
+
+ /** true when the durable client associated with this proxy is being
+ * restarted and prevents cqs from being closed and drained**/
+ private boolean drainLocked = false;
+ private final Object drainLock = new Object();
+
+ /** number of cq drains that are currently in progress **/
+ private int numDrainsInProgress = 0;
+ private final Object drainsInProgressLock = new Object();
+
+ /**
+ * Constructor.
+ *
+ * @param ccn
+ * The <code>CacheClientNotifier</code> registering this proxy
+ * @param socket
+ * The socket between the server and the client
+ * @param proxyID
+ * representing the Connection Proxy of the clien
+ * @param isPrimary
+ * The boolean stating whether this prozxy is primary
+ * @throws CacheException {
+ */
+ protected CacheClientProxy(CacheClientNotifier ccn, Socket socket,
+ ClientProxyMembershipID proxyID, boolean isPrimary, byte clientConflation,
+ Version clientVersion, long acceptorId, boolean notifyBySubscription)
+ throws CacheException {
+ initializeTransientFields(socket, proxyID, isPrimary, clientConflation, clientVersion);
+ this._cacheClientNotifier = ccn;
+ this._cache = (GemFireCacheImpl)ccn.getCache();
+ this._maximumMessageCount = ccn.getMaximumMessageCount();
+ this._messageTimeToLive = ccn.getMessageTimeToLive();
+ this._acceptorId = acceptorId;
+ this.notifyBySubscription = notifyBySubscription;
+ StatisticsFactory factory = this._cache.getDistributedSystem();
+ this._statistics = new CacheClientProxyStats(factory,
+ "id_"+this.proxyID.getDistributedMember().getId()+ "_at_"+ this._remoteHostAddress + ":" + this._socket.getPort());
+
+ // Create the interest list
+ this.cils[RegisterInterestTracker.interestListIndex] =
+ new ClientInterestList(this, this.proxyID);
+ // Create the durable interest list
+ this.cils[RegisterInterestTracker.durableInterestListIndex] =
+ new ClientInterestList(this, this.getDurableId());
+ this.postAuthzCallback = null;
+ this._cacheClientNotifier.getAcceptorStats().incCurrentQueueConnections();
+ this.creationDate = new Date();
+ initializeClientAuths();
+ }
+
+ private void initializeClientAuths()
+ {
+ if(AcceptorImpl.isPostAuthzCallbackPresent())
+ this.clientUserAuths = ServerConnection.getClientUserAuths(this.proxyID);
+ }
+
+ private void reinitializeClientAuths()
+ {
+ if (this.clientUserAuths != null && AcceptorImpl.isPostAuthzCallbackPresent()) {
+ synchronized (this.clientUserAuthsLock) {
+ ClientUserAuths newClientAuth = ServerConnection.getClientUserAuths(this.proxyID);
+ newClientAuth.fillPreviousCQAuth(this.clientUserAuths);
+ this.clientUserAuths = newClientAuth;
+ }
+ }
+ }
+
+ public void setPostAuthzCallback(AccessControl authzCallback) {
+ //TODO:hitesh synchronization
+ synchronized (this.clientUserAuthsLock) {
+ if (this.postAuthzCallback != null)
+ this.postAuthzCallback.close();
+ this.postAuthzCallback = authzCallback;
+ }
+ }
+
+ public void setCQVsUserAuth(String cqName, long uniqueId, boolean isDurable)
+ {
+ if(postAuthzCallback == null) //only for multiuser
+ {
+ if(this.clientUserAuths != null)
+ this.clientUserAuths.setUserAuthAttributesForCq(cqName, uniqueId, isDurable);
+ }
+ }
+
+ private void initializeTransientFields(Socket socket,
+ ClientProxyMembershipID pid, boolean ip, byte cc, Version vers) {
+ this._socket = socket;
+ this.proxyID = pid;
+ this.connected = true;
+ {
+ int bufSize = 1024;
+ try {
+ bufSize = _socket.getSendBufferSize();
+ if (bufSize < 1024) {
+ bufSize = 1024;
+ }
+ } catch (SocketException ignore) {
+ }
+ this._commBuffer = ServerConnection.allocateCommBuffer(bufSize, socket);
+ }
+ this._remoteHostAddress = socket.getInetAddress().getHostAddress();
+ this.isPrimary = ip;
+ this.clientConflation = cc;
+ this.clientVersion = vers;
+ }
+
+ public boolean isMarkerEnqueued() {
+ return markerEnqueued;
+ }
+
+ public void setMarkerEnqueued(boolean bool) {
+ markerEnqueued = bool;
+ }
+
+ public long getAcceptorId(){
+ return this._acceptorId;
+ }
+
+ /**
+ * @return the notifyBySubscription
+ */
+ public boolean isNotifyBySubscription() {
+ return this.notifyBySubscription;
+ }
+
+
+ /**
+ * Returns the DistributedMember represented by this proxy
+ */
+ public ClientProxyMembershipID getProxyID()
+ {
+ return this.proxyID;
+ }
+
+ // the following code was commented out simply because it was not used
+ // /**
+ // * Determines if the proxy represents the client host (and only the host, not
+ // * necessarily the exact VM running on the host)
+ // *
+ // * @return Whether the proxy represents the client host
+ // */
+ // protected boolean representsClientHost(String clientHost)
+ // {
+ // // [bruce] TODO BUGBUGBUG: this should compare InetAddresses, not Strings
+ // return this._remoteHostAddress.equals(clientHost);
+ // }
+
+ // protected boolean representsClientVM(DistributedMember remoteMember)
+ // {
+ // // logger.warn("Is input port " + clientPort + " contained in " +
+ // // logger.warn("Does input host " + clientHost + " equal " +
+ // // this._remoteHostAddress+ ": " + representsClientHost(clientHost));
+ // // logger.warn("representsClientVM: " +
+ // // (representsClientHost(clientHost) && containsPort(clientPort)));
+ // return (proxyID.getDistributedMember().equals(remoteMember));
+ // }
+
+ // /**
+ // * Determines if the CacheClientUpdater proxied by this instance is listening
+ // * on the input clientHost and clientPort
+ // *
+ // * @param clientHost
+ // * The host name of the client to compare
+ // * @param clientPort
+ // * The port number of the client to compare
+ // *
+ // * @return Whether the CacheClientUpdater proxied by this instance is
+ // * listening on the input clientHost and clientPort
+ // */
+ // protected boolean representsCacheClientUpdater(String clientHost,
+ // int clientPort)
+ // {
+ // return (clientPort == this._socket.getPort() && representsClientHost(clientHost));
+ // }
+
+ protected boolean isMember(ClientProxyMembershipID memberId)
+ {
+ return this.proxyID.equals(memberId);
+ }
+
+ protected boolean isSameDSMember(ClientProxyMembershipID memberId)
+ {
+ return this.proxyID.isSameDSMember(memberId);
+ }
+
+ /**
+ * Set the queue keepalive option
+ *
+ * @param option whether to keep the durable client's queue alive
+ */
+ protected void setKeepAlive(boolean option) {
+ this.keepalive = option;
+ }
+
+ /**
+ * Returns the socket between the server and the client
+ *
+ * @return the socket between the server and the client
+ */
+ protected Socket getSocket()
+ {
+ return this._socket;
+ }
+
+ public String getSocketHost()
+ {
+ return this._socket.getInetAddress().getHostAddress();
+ }
+
+ protected ByteBuffer getCommBuffer() {
+ return this._commBuffer;
+ }
+
+ /**
+ * Returns the remote host's IP address string
+ *
+ * @return the remote host's IP address string
+ */
+ protected String getRemoteHostAddress()
+ {
+ return this._remoteHostAddress;
+ }
+
+ /**
+ * Returns the remote host's port
+ *
+ * @return the remote host's port
+ */
+ public int getRemotePort()
+ {
+ return this._socket.getPort();
+ }
+
+ /**
+ * Returns whether the proxy is connected to a remote client
+ *
+ * @return whether the proxy is connected to a remote client
+ */
+ public boolean isConnected() {
+ return this.connected;
+ }
+
+ /**
+ * Mark the receiver as needing removal
+ * @return true if it was already marked for removal
+ */
+ protected boolean startRemoval() {
+ boolean result;
+ synchronized (this.isMarkedForRemovalLock) {
+ result = this.isMarkedForRemoval;
+ this.isMarkedForRemoval = true;
+ }
+ return result;
+ }
+
+ /**
+ * Wait until the receiver's removal has completed before
+ * returning.
+ * @return true if the proxy was initially marked for removal
+ */
+ protected boolean waitRemoval() {
+ boolean result;
+ synchronized (this.isMarkedForRemovalLock) {
+ result = this.isMarkedForRemoval;
+ boolean interrupted = false;
+ try {
+ while (this.isMarkedForRemoval) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Waiting for CacheClientProxy removal: {}", this);
+ }
+ try {
+ this.isMarkedForRemovalLock.wait();
+ }
+ catch (InterruptedException e) {
+ interrupted = true;
+ this._cache.getCancelCriterion().checkCancelInProgress(e);
+ }
+ } // while
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } // synchronized
+ return result;
+ }
+
+ /**
+ * Indicate that removal has completed on this instance
+ */
+ protected void notifyRemoval() {
+ synchronized (this.isMarkedForRemovalLock) {
+ this.isMarkedForRemoval = false;
+ this.isMarkedForRemovalLock.notifyAll();
+ }
+ }
+
+ /**
+ * Returns the GemFire cache
+ *
+ * @return the GemFire cache
+ */
+ public GemFireCacheImpl getCache()
+ {
+ return this._cache;
+ }
+
+ public Set<String> getInterestRegisteredRegions() {
+ HashSet<String> regions = new HashSet<String>();
+ for(int i=0; i < this.cils.length; i++){
+ if (!this.cils[i].regions.isEmpty()) {
+ regions.addAll(this.cils[i].regions);
+ }
+ }
+ return regions;
+ }
+
+ /**
+ * Returns the proxy's statistics
+ *
+ * @return the proxy's statistics
+ */
+ public CacheClientProxyStats getStatistics()
+ {
+ return this._statistics;
+ }
+
+ /**
+ * Returns this proxy's <code>CacheClientNotifier</code>.
+ * @return this proxy's <code>CacheClientNotifier</code>
+ */
+ protected CacheClientNotifier getCacheClientNotifier() {
+ return this._cacheClientNotifier;
+ }
+
+ /**
+ * Returns the size of the queue for heuristic purposes. This
+ * size may be changing concurrently if puts/gets are occurring
+ * at the same time.
+ */
+ public int getQueueSize() {
+ return this._messageDispatcher == null ? 0
+ : this._messageDispatcher.getQueueSize();
+ }
+
+ /**
+ * returns the queue size calculated through stats
+ */
+ public int getQueueSizeStat() {
+ return this._messageDispatcher == null ? 0
+ : this._messageDispatcher.getQueueSizeStat();
+ }
+
+
+ public boolean drainInProgress() {
+ synchronized(drainsInProgressLock) {
+ return numDrainsInProgress > 0;
+ }
+ }
+
+ //Called from CacheClientNotifier when attempting to restart paused proxy
+ //locking the drain lock requires that no drains are in progress
+ //when the lock was acquired.
+ public boolean lockDrain() {
+ synchronized(drainsInProgressLock) {
+ if (!drainInProgress()) {
+ synchronized(drainLock) {
+ if (testHook != null) {
+ testHook.doTestHook("PRE_ACQUIRE_DRAIN_LOCK_UNDER_SYNC");
+ }
+ //prevent multiple lockings of drain lock
+ if (!drainLocked) {
+ drainLocked = true;
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ //Called from CacheClientNotifier when completed restart of proxy
+ public void unlockDrain() {
+ if (testHook != null) {
+ testHook.doTestHook("PRE_RELEASE_DRAIN_LOCK");
+ }
+ synchronized(drainLock) {
+ drainLocked = false;
+ }
+ }
+
+ //Only close the client cq if it is paused and no one is attempting to restart the proxy
+ public boolean closeClientCq(String clientCQName) throws CqException {
+ if (testHook != null) {
+ testHook.doTestHook("PRE_DRAIN_IN_PROGRESS");
+ }
+ synchronized(drainsInProgressLock) {
+ numDrainsInProgress ++;
+ }
+ if (testHook != null) {
+ testHook.doTestHook("DRAIN_IN_PROGRESS_BEFORE_DRAIN_LOCK_CHECK");
+ }
+ try {
+ //If the drain lock was acquired, the other thread did so before we could bump up
+ //the numDrainsInProgress. That means we need to stop.
+ if (drainLocked) {
+ // someone is trying to restart a paused proxy
+ String msg = LocalizedStrings.CacheClientProxy_COULD_NOT_DRAIN_CQ_DUE_TO_RESTARTING_DURABLE_CLIENT.toLocalizedString(clientCQName, proxyID.getDurableId());
+ logger.info(msg);
+ throw new CqException(msg);
+ }
+ //isConnected is to protect against the case where a durable client has reconnected
+ //but has not yet sent a ready for events message
+ //we can probably remove the isPaused check
+ if (isPaused() && !isConnected()) {
+ CqService cqService = getCache().getCqService();
+ if (cqService != null) {
+ InternalCqQuery cqToClose = cqService.getCq(cqService.constructServerCqName(
+ clientCQName, this.proxyID));
+ // close and drain
+ if (cqToClose != null) {
+ cqService.closeCq(clientCQName, this.proxyID);
+ this._messageDispatcher.drainClientCqEvents(this.proxyID, cqToClose);
+ }
+ else {
+ String msg = LocalizedStrings.CqService_CQ_NOT_FOUND_FAILED_TO_CLOSE_THE_SPECIFIED_CQ_0.toLocalizedString(clientCQName);
+ logger.info(msg);
+ throw new CqException(msg);
+ }
+ }
+ } else {
+ String msg = LocalizedStrings.CacheClientProxy_COULD_NOT_DRAIN_CQ_DUE_TO_ACTIVE_DURABLE_CLIENT.toLocalizedString(clientCQName, proxyID.getDurableId());
+ logger.info(msg);
+ throw new CqException(msg);
+ }
+ } finally {
+ synchronized (drainsInProgressLock) {
+ numDrainsInProgress--;
+ }
+ if (testHook != null) {
+ testHook.doTestHook("DRAIN_COMPLETE");
+ }
+
+ }
+ return true;
+ }
+
+
+ /**
+ * Returns whether the proxy is alive. It is alive if its message dispatcher
+ * is processing messages.
+ *
+ * @return whether the proxy is alive
+ */
+ protected boolean isAlive()
+ {
+ if (this._messageDispatcher == null) {
+ return false;
+ }
+ return !this._messageDispatcher.isStopped();
+ }
+
+ /**
+ * Returns whether the proxy is paused. It is paused if its message dispatcher
+ * is paused. This only applies to durable clients.
+ *
+ * @return whether the proxy is paused
+ *
+ * @since 5.5
+ */
+ protected boolean isPaused() {
+ return this._isPaused;
+ }
+
+ protected void setPaused(boolean isPaused) {
+ this._isPaused = isPaused;
+ }
+
+ /**
+ * Closes the proxy. This method checks the message queue for any unprocessed
+ * messages and processes them for MAXIMUM_SHUTDOWN_PEEKS.
+ *
+ * @see CacheClientProxy#MAXIMUM_SHUTDOWN_PEEKS
+ */
+ protected void close()
+ {
+ close(true, false);
+ }
+
+ /**
+ * Set to true once this proxy starts being closed.
+ * Remains true for the rest of its existence.
+ */
+ private final AtomicBoolean closing = new AtomicBoolean(false);
+
+ /**
+ * Close the <code>CacheClientProxy</code>.
+ *
+ * @param checkQueue
+ * Whether to message check the queue and process any contained
+ * messages (up to MAXIMUM_SHUTDOWN_PEEKS).
+ * @param stoppedNormally
+ * Whether client stopped normally
+ *
+ * @return whether to keep this <code>CacheClientProxy</code>
+ * @see CacheClientProxy#MAXIMUM_SHUTDOWN_PEEKS
+ */
+ protected boolean close(boolean checkQueue, boolean stoppedNormally) {
+ boolean pauseDurable = false;
+ // If the client is durable and either (a) it hasn't stopped normally or (b) it
+ // has stopped normally but it is configured to be kept alive, set pauseDurable
+ // to true
+ if (isDurable()
+ && (!stoppedNormally || (getDurableKeepAlive() && stoppedNormally))) {
+ pauseDurable = true;
+ }
+
+ boolean keepProxy = false;
+ if (pauseDurable) {
+ pauseDispatching();
+ keepProxy = true;
+ } else {
+ terminateDispatching(checkQueue);
+ closeTransientFields();
+ }
+
+ this.connected = false;
+
+ // Close the Authorization callback (if any)
+ try {
+ if (!pauseDurable) {
+ if (this.postAuthzCallback != null) {//for single user
+ this.postAuthzCallback.close();
+ this.postAuthzCallback = null;
+ }else if(this.clientUserAuths != null) {//for multiple users
+ this.clientUserAuths.cleanup(true);
+ this.clientUserAuths = null;
+ }
+ }
+ }
+ catch (Exception ex) {
+ if (this._cache.getSecurityLoggerI18n().warningEnabled()) {
+ this._cache.getSecurityLoggerI18n().warning(LocalizedStrings.TWO_ARG_COLON, new Object[] {this, ex});
+ }
+ }
+ // Notify the caller whether to keep this proxy. If the proxy is durable
+ // and should be paused, then return true; otherwise return false.
+ return keepProxy;
+ }
+
+ protected void pauseDispatching() {
+ if (this._messageDispatcher == null){
+ return;
+ }
+
+ // If this is the primary, pause the dispatcher (which closes its transient
+ // fields. Otherwise, just close the transient fields.
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Pausing processing", this);
+ }
+ //BUGFIX for BUG#38234
+ if(!testAndSetPaused(true) && this.isPrimary) {
+ if (this._messageDispatcher != Thread.currentThread()) {
+ // don't interrupt ourself to fix bug 40611
+ this._messageDispatcher.interrupt();
+ }
+ }
+
+ try {
+ // Close transient fields
+ closeTransientFields();
+ } finally {
+ // make sure this gets called if closeTransientFields throws; see bug 40611
+ // Start timer
+ scheduleDurableExpirationTask();
+ }
+ }
+
+ private boolean testAndSetPaused(boolean newValue) {
+
+ synchronized(this._messageDispatcher._pausedLock) {
+ if (this._isPaused != newValue) {
+ this._isPaused = newValue;
+ this._messageDispatcher._pausedLock.notifyAll();
+ return !this._isPaused;
+ }
+ else {
+ this._messageDispatcher._pausedLock.notifyAll();
+ return this._isPaused;
+ }
+ }
+ }
+ protected void terminateDispatching(boolean checkQueue) {
+ if (this._messageDispatcher == null){
+ return;
+ }
+
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Terminating processing", this);
+ }
+ if (this._messageDispatcher == Thread.currentThread()) {
+ // I'm not even sure this is possible but if the dispatcher
+ // calls us then at least call stopDispatching
+ // the old code did this (I'm not even sure it is safe to do).
+ // This needs to be done without testing OR setting "closing".
+ this._messageDispatcher.stopDispatching(checkQueue);
+ this.cils[RegisterInterestTracker.interestListIndex].clearClientInterestList();
+ this.cils[RegisterInterestTracker.durableInterestListIndex].clearClientInterestList();
+ // VJR: bug 37487 fix
+ destroyRQ();
+ return;
+ }
+
+ if (!this.closing.compareAndSet(false, true)) {
+ // must already be closing so just return
+ // this is part of the fix for 37684
+ return;
+ }
+ // Unregister interest in all interests (if necessary)
+ this.cils[RegisterInterestTracker.interestListIndex].clearClientInterestList();
+ this.cils[RegisterInterestTracker.durableInterestListIndex].clearClientInterestList();
+
+ // If the message dispatcher is paused, unpause it. The next bit of
+ // code will interrupt the waiter.
+ if (this.testAndSetPaused(false)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Paused but terminating processing", this);
+ }
+ // Cancel the expiration task
+ cancelDurableExpirationTask(false);
+ }
+
+ boolean alreadyDestroyed = false;
+ boolean gotInterrupt = Thread.interrupted(); // clears the flag
+ try {
+ // Stop the message dispatcher
+ this._messageDispatcher.stopDispatching(checkQueue);
+
+ gotInterrupt |= Thread.interrupted(); // clears the flag
+
+ // to fix bug 37684
+ // 1. check to see if dispatcher is still alive
+ if (this._messageDispatcher.isAlive()) {
+ closeSocket();
+ destroyRQ();
+ alreadyDestroyed = true;
+ this._messageDispatcher.interrupt();
+ if (this._messageDispatcher.isAlive()) {
+ try {
+ this._messageDispatcher.join(1000);
+ } catch (InterruptedException ex) {
+ gotInterrupt = true;
+ }
+ // if it is still alive then warn and move on
+ if (this._messageDispatcher.isAlive()) {
+ //com.gemstone.gemfire.internal.OSProcess.printStacks(com.gemstone.gemfire.internal.OSProcess.getId());
+ logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0_COULD_NOT_STOP_MESSAGE_DISPATCHER_THREAD, this));
+ }
+ }
+ }
+ }
+ finally {
+ if (gotInterrupt) {
+ Thread.currentThread().interrupt();
+ }
+ if (!alreadyDestroyed) {
+ destroyRQ();
+ }
+ }
+ } finally {
+ // Close the statistics
+ this._statistics.close(); // fix for bug 40105
+ closeTransientFields(); // make sure this happens
+ }
+ }
+
+ private void closeSocket() {
+ if (this._socketClosed.compareAndSet(false, true)) {
+ // Close the socket
+ this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, this._remoteHostAddress, null);
+ getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
+ }
+ }
+
+ private void closeTransientFields() {
+ closeSocket();
+
+ // Null out comm buffer, host address, ports and proxy id. All will be
+ // replaced when the client reconnects.
+ releaseCommBuffer();
+ {
+ String remoteHostAddress = this._remoteHostAddress;
+ if (remoteHostAddress != null) {
+ this._cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress);
+ this._remoteHostAddress = null;
+ }
+ }
+ try {
+ this.cils[RegisterInterestTracker.interestListIndex].clearClientInterestList();
+ } catch (CacheClosedException e) {
+ // ignore if cache is shutting down
+ }
+ // Commented to fix bug 40259
+ //this.clientVersion = null;
+ closeNonDurableCqs();
+ }
+
+ private void releaseCommBuffer() {
+ ByteBuffer bb = this._commBuffer;
+ if (bb != null) {
+ this._commBuffer = null;
+ ServerConnection.releaseCommBuffer(bb);
+ }
+ }
+
+ private void closeNonDurableCqs(){
+ CqService cqService = getCache().getCqService();
+ if (cqService != null) {
+ try {
+ cqService.closeNonDurableClientCqs(getProxyID());
+ }
+ catch (CqException ex) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_CQEXCEPTION_WHILE_CLOSING_NON_DURABLE_CQS_0, ex.getLocalizedMessage()));
+ }
+ }
+ }
+
+ private void destroyRQ() {
+ if (this._messageDispatcher == null) {
+ return;
+ }
+ try {
+ // Using Destroy Region bcoz this method is modified in HARegion so as
+ // not to distribute.
+ // For normal Regions , even the localDestroyRegion actually propagates
+ HARegionQueue rq = this._messageDispatcher._messageQueue;
+ rq.destroy();
+
+ // if (!rq.getRegion().isDestroyed()) {
+ // rq.getRegion().destroyRegion();
+ // }
+ }
+ catch (RegionDestroyedException rde) {
+ // throw rde;
+ }
+ catch (CancelException e) {
+ // throw e;
+ }
+ catch (Exception warning) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0_EXCEPTION_IN_CLOSING_THE_UNDERLYING_HAREGION_OF_THE_HAREGIONQUEUE, this), warning);
+ }
+ }
+
+ public void registerInterestRegex(String regionName, String regex,
+ boolean isDurable) {
+ registerInterestRegex(regionName, regex, isDurable, true);
+ }
+
+ public void registerInterestRegex(String regionName, String regex,
+ boolean isDurable, boolean receiveValues) {
+ if (this.isPrimary) {
+ // Notify all secondaries and client of change in interest
+ notifySecondariesAndClient(regionName, regex, InterestResultPolicy.NONE,
+ isDurable, receiveValues, InterestType.REGULAR_EXPRESSION);
+ } else {
+ throw new IllegalStateException(LocalizedStrings.CacheClientProxy_NOT_PRIMARY.toLocalizedString());
+ }
+ }
+
+ public void registerInterest(String regionName, Object keyOfInterest,
+ InterestResultPolicy policy, boolean isDurable) {
+ registerInterest(regionName, keyOfInterest, policy, isDurable, true);
+ }
+
+ public void registerInterest(String regionName, Object keyOfInterest,
+ InterestResultPolicy policy, boolean isDurable,
+ boolean receiveValues) {
+ if (keyOfInterest instanceof String && keyOfInterest.equals("ALL_KEYS")) {
+ registerInterestRegex(regionName, ".*", isDurable, receiveValues);
+ } else if (keyOfInterest instanceof List) {
+ if (this.isPrimary) {
+ notifySecondariesAndClient(regionName, keyOfInterest, policy,
+ isDurable, receiveValues, InterestType.KEY);
+ } else {
+ throw new IllegalStateException(LocalizedStrings.CacheClientProxy_NOT_PRIMARY.toLocalizedString());
+ }
+ } else {
+ if (this.isPrimary) {
+ // Notify all secondaries and client of change in interest
+ notifySecondariesAndClient(regionName, keyOfInterest, policy,
+ isDurable, receiveValues, InterestType.KEY);
+
+ // Enqueue the initial value message for the client if necessary
+ if (policy == InterestResultPolicy.KEYS_VALUES) {
+ Get70 request = (Get70)Get70.getCommand();
+ LocalRegion lr = (LocalRegion) this._cache.getRegion(regionName);
+ Get70.Entry entry = request.getValueAndIsObject(lr, keyOfInterest, null,
+ null);
+ boolean isObject = entry.isObject;
+ byte[] value = null;
+ if (entry.value instanceof byte[]) {
+ value = (byte[])entry.value;
+ } else {
+ try {
+ value = CacheServerHelper.serialize(entry.value);
+ } catch (IOException e) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_THE_FOLLOWING_EXCEPTION_OCCURRED_0, entry.value), e);
+ }
+ }
+ VersionTag tag = entry.versionTag;
+ ClientUpdateMessage updateMessage = new ClientUpdateMessageImpl(
+ EnumListenerEvent.AFTER_CREATE, lr, keyOfInterest, value, null,
+ (isObject ? (byte) 0x01 : (byte) 0x00), null, this.proxyID,
+ new EventID(this._cache.getDistributedSystem()), tag);
+ CacheClientNotifier.routeSingleClientMessage(updateMessage, this.proxyID);
+ }
+ // Add the client to the region's filters
+ //addFilterRegisteredClients(regionName, keyOfInterest);
+ } else {
+ throw new IllegalStateException(LocalizedStrings.CacheClientProxy_NOT_PRIMARY.toLocalizedString());
+ }
+ }
+ }
+
+ private void notifySecondariesAndClient(String regionName,
+ Object keyOfInterest, InterestResultPolicy policy, boolean isDurable,
+ boolean receiveValues, int interestType) {
+ // Create a client interest message for the keyOfInterest
+ ClientInterestMessageImpl message = new ClientInterestMessageImpl(
+ new EventID(this._cache.getDistributedSystem()), regionName,
+ keyOfInterest, interestType, policy.getOrdinal(), isDurable,
+ !receiveValues, ClientInterestMessageImpl.REGISTER);
+
+ // Notify all secondary proxies of a change in interest
+ notifySecondariesOfInterestChange(message);
+
+ // Modify interest registration
+ if (keyOfInterest instanceof List) {
+ registerClientInterestList(regionName, (List) keyOfInterest, isDurable,
+ !receiveValues, true);
+ } else {
+ registerClientInterest(regionName, keyOfInterest, interestType,
+ isDurable, !receiveValues, true);
+ }
+
+ // Enqueue the interest registration message for the client.
+ // If the client is not 7.0.1 or greater and the key of interest is a list,
+ // then create an individual message for each entry in the list since the
+ // client doesn't support a ClientInterestMessageImpl containing a list.
+ if (Version.GFE_701.compareTo(this.clientVersion) > 0
+ && keyOfInterest instanceof List) {
+ for (Iterator i = ((List) keyOfInterest).iterator(); i.hasNext();) {
+ this._messageDispatcher.enqueueMessage(new ClientInterestMessageImpl(
+ new EventID(this._cache.getDistributedSystem()), regionName,
+ i.next(), interestType, policy.getOrdinal(), isDurable, !receiveValues,
+ ClientInterestMessageImpl.REGISTER));
+ }
+ } else {
+ this._messageDispatcher.enqueueMessage(message);
+ }
+ }
+
+ public void unregisterInterestRegex(String regionName, String regex,
+ boolean isDurable) {
+ unregisterInterestRegex(regionName, regex, isDurable, true);
+ }
+
+ public void unregisterInterestRegex(String regionName, String regex,
+ boolean isDurable, boolean receiveValues) {
+ if (this.isPrimary) {
+ notifySecondariesAndClient(regionName, regex, isDurable, receiveValues,
+ InterestType.REGULAR_EXPRESSION);
+ } else {
+ throw new IllegalStateException(LocalizedStrings.CacheClientProxy_NOT_PRIMARY.toLocalizedString());
+ }
+ }
+
+ public void unregisterInterest(String regionName, Object keyOfInterest,
+ boolean isDurable) {
+ unregisterInterest(regionName, keyOfInterest, isDurable, true);
+ }
+
+ public void unregisterInterest(String regionName, Object keyOfInterest,
+ boolean isDurable, boolean receiveValues) {
+ if (keyOfInterest instanceof String && keyOfInterest.equals("ALL_KEYS")) {
+ unregisterInterestRegex(regionName, ".*", isDurable, receiveValues);
+ } else {
+ if (this.isPrimary) {
+ notifySecondariesAndClient(regionName, keyOfInterest, isDurable,
+ receiveValues, InterestType.KEY);
+ } else {
+ throw new IllegalStateException(LocalizedStrings.CacheClientProxy_NOT_PRIMARY.toLocalizedString());
+ }
+ }
+ }
+
+ private void notifySecondariesAndClient(String regionName,
+ Object keyOfInterest, boolean isDurable, boolean receiveValues,
+ int interestType) {
+ // Notify all secondary proxies of a change in interest
+ ClientInterestMessageImpl message = new ClientInterestMessageImpl(
+ new EventID(this._cache.getDistributedSystem()), regionName,
+ keyOfInterest, interestType, (byte) 0, isDurable, !receiveValues,
+ ClientInterestMessageImpl.UNREGISTER);
+ notifySecondariesOfInterestChange(message);
+
+ // Modify interest registration
+ if (keyOfInterest instanceof List) {
+ unregisterClientInterest(regionName, (List) keyOfInterest, false);
+ } else {
+ unregisterClientInterest(regionName, keyOfInterest, interestType,
+ false);
+ }
+
+ // Enqueue the interest unregistration message for the client.
+ // If the client is not 7.0.1 or greater and the key of interest is a list,
+ // then create an individual message for each entry in the list since the
+ // client doesn't support a ClientInterestMessageImpl containing a list.
+ if (Version.GFE_701.compareTo(this.clientVersion) > 0
+ && keyOfInterest instanceof List) {
+ for (Iterator i = ((List) keyOfInterest).iterator(); i.hasNext();) {
+ this._messageDispatcher.enqueueMessage(new ClientInterestMessageImpl(
+ new EventID(this._cache.getDistributedSystem()), regionName,
+ i.next(), interestType, (byte) 0, isDurable, !receiveValues,
+ ClientInterestMessageImpl.UNREGISTER));
+ }
+ } else {
+ this._messageDispatcher.enqueueMessage(message);
+ }
+ }
+
+ protected void notifySecondariesOfInterestChange(ClientInterestMessageImpl message) {
+ if (logger.isDebugEnabled()) {
+ StringBuffer subBuffer = new StringBuffer();
+ if (message.isRegister()) {
+ subBuffer
+ .append("register ")
+ .append(message.getIsDurable() ? "" : "non-")
+ .append("durable interest in ");
+ } else {
+ subBuffer.append("unregister interest in ");
+ }
+ StringBuffer buffer = new StringBuffer();
+ buffer
+ .append(this)
+ .append(": Notifying secondary proxies to ")
+ .append(subBuffer.toString())
+ .append(message.getRegionName())
+ .append("->")
+ .append(message.getKeyOfInterest())
+ .append("->")
+ .append(InterestType.getString(message.getInterestType()));
+ logger.debug(buffer.toString());
+ }
+ this._cacheClientNotifier.deliverInterestChange(this.proxyID, message);
+ }
+
+ /*
+ protected void addFilterRegisteredClients(String regionName,
+ Object keyOfInterest) {
+ try {
+ this._cacheClientNotifier.addFilterRegisteredClients(regionName,
+ this.proxyID);
+ } catch (RegionDestroyedException e) {
+ logger.warn(LocalizedStrings.CacheClientProxy_0_INTEREST_REG_FOR_0_FAILED, regionName + "->" + keyOfInterest, e);
+ }
+ }
+ */
+
+ /**
+ * Registers interest in the input region name and key
+ *
+ * @param regionName
+ * The fully-qualified name of the region in which to register
+ * interest
+ * @param keyOfInterest
+ * The key in which to register interest
+ */
+ protected void registerClientInterest(String regionName,
+ Object keyOfInterest, int interestType, boolean isDurable,
+ boolean sendUpdatesAsInvalidates, boolean flushState)
+ {
+ ClientInterestList cil =
+ this.cils[RegisterInterestTracker.getInterestLookupIndex(
+ isDurable, false)];
+ cil.registerClientInterest(regionName, keyOfInterest, interestType, sendUpdatesAsInvalidates);
+ if (flushState) {
+ flushForInterestRegistration(regionName, this._cache.getDistributedSystem().getDistributedMember());
+ }
+ HARegionQueue queue = getHARegionQueue();
+ if (queue != null) { // queue is null during initialization
+ queue.setHasRegisteredInterest(true);
+ }
+ }
+
+ /**
+ * flush other regions to the given target. This is usually the member
+ * that is registering the interest. During queue creation it is the
+ * queue's image provider.
+ */
+ public void flushForInterestRegistration(String regionName, DistributedMember target) {
+ Region r = this._cache.getRegion(regionName);
+ if (r == null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Unable to find region '{}' to flush for interest registration", regionName);
+ }
+ } else if (r.getAttributes().getScope().isDistributed()) {
+ if (logger.isDebugEnabled()){
+ logger.debug("Flushing region '{}' for interest registration", regionName);
+ }
+ CacheDistributionAdvisee cd = (CacheDistributionAdvisee)r;
+ final StateFlushOperation sfo;
+ if (r instanceof PartitionedRegion) {
+ // need to flush all buckets. SFO should be changed to target buckets
+ // belonging to a particular PR, but it doesn't have that option right now
+ sfo = new StateFlushOperation(
+ this._cache.getDistributedSystem().getDistributionManager());
+ } else {
+ sfo = new StateFlushOperation((DistributedRegion)r);
+ }
+ try {
+ // bug 41681 - we need to flush any member that may have a cache operation
+ // in progress so that the changes are received there before returning
+ // from this method
+ InitialImageAdvice advice = cd.getCacheDistributionAdvisor().adviseInitialImage(null);
+ HashSet recips = new HashSet(advice.getReplicates());
+ recips.addAll(advice.getUninitialized());
+ recips.addAll(advice.getEmpties());
+ recips.addAll(advice.getPreloaded());
+ recips.addAll(advice.getOthers());
+ sfo.flush(recips,
+ target,
+ DistributionManager.HIGH_PRIORITY_EXECUTOR, true);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ }
+
+ /**
+ * Unregisters interest in the input region name and key
+ *
+ * @param regionName
+ * The fully-qualified name of the region in which to unregister
+ * interest
+ * @param keyOfInterest
+ * The key in which to unregister interest
+ * @param isClosing
+ * Whether the caller is closing
+ */
+ protected void unregisterClientInterest(String regionName,
+ Object keyOfInterest, int interestType, boolean isClosing)
+ {
+ // only unregister durable interest if isClosing and !keepalive
+ if (!isClosing /* explicit unregister */
+ || !getDurableKeepAlive() /* close and no keepAlive*/) {
+ this.cils[RegisterInterestTracker.durableInterestListIndex].
+ unregisterClientInterest(regionName, keyOfInterest, interestType);
+ }
+ // always unregister non durable interest
+ this.cils[RegisterInterestTracker.interestListIndex].
+ unregisterClientInterest(regionName, keyOfInterest, interestType);
+ }
+
+ /**
+ * Registers interest in the input region name and list of keys
+ *
+ * @param regionName
+ * The fully-qualified name of the region in which to register
+ * interest
+ * @param keysOfInterest
+ * The list of keys in which to register interest
+ */
+ protected void registerClientInterestList(String regionName,
+ List keysOfInterest, boolean isDurable, boolean sendUpdatesAsInvalidates,
+ boolean flushState)
+ {
+ // we only use two interest lists to map the non-durable and durable
+ // identifiers to their interest settings
+ ClientInterestList cil =
+ this.cils[RegisterInterestTracker.getInterestLookupIndex(
+ isDurable, false/*sendUpdatesAsInvalidates*/)];
+ cil.registerClientInterestList(regionName, keysOfInterest, sendUpdatesAsInvalidates);
+ if (getHARegionQueue() != null) {
+ if (flushState) {
+ flushForInterestRegistration(regionName, this._cache.getDistributedSystem().getDistributedMember());
+ }
+ getHARegionQueue().setHasRegisteredInterest(true);
+ }
+ }
+
+ /**
+ * Unregisters interest in the input region name and list of keys
+ *
+ * @param regionName
+ * The fully-qualified name of the region in which to unregister
+ * interest
+ * @param keysOfInterest
+ * The list of keys in which to unregister interest
+ * @param isClosing
+ * Whether the caller is closing
+ */
+ protected void unregisterClientInterest(String regionName,
+ List keysOfInterest, boolean isClosing)
+ {
+ // only unregister durable interest if isClosing and !keepalive
+ if (!isClosing /* explicit unregister */
+ || !getDurableKeepAlive() /* close and no keepAlive*/) {
+ this.cils[RegisterInterestTracker.durableInterestListIndex].
+ unregisterClientInterestList(regionName, keysOfInterest);
+ }
+ // always unregister non durable interest
+ this.cils[RegisterInterestTracker.interestListIndex].
+ unregisterClientInterestList(regionName, keysOfInterest);
+ }
+
+
+ /** sent by the cache client notifier when there is an interest registration change */
+ protected void processInterestMessage(ClientInterestMessageImpl message) {
+ int interestType = message.getInterestType();
+ String regionName = message.getRegionName();
+ Object key = message.getKeyOfInterest();
+ if (message.isRegister()) {
+ // Register interest in this region->key
+ if (key instanceof List) {
+ registerClientInterestList(regionName, (List) key,
+ message.getIsDurable(), message.getForUpdatesAsInvalidates(), true);
+ } else {
+ registerClientInterest(regionName, key, interestType,
+ message.getIsDurable(), message.getForUpdatesAsInvalidates(), true);
+ }
+
+ // Add the client to the region's filters
+ //addFilterRegisteredClients(regionName, key);
+
+ if (logger.isDebugEnabled()) {
+ StringBuffer buffer = new StringBuffer();
+ buffer
+ .append(this)
+ .append(": Interest listener registered ")
+ .append(message.getIsDurable() ? "" : "non-")
+ .append("durable interest in ")
+ .append(message.getRegionName())
+ .append("->")
+ .append(message.getKeyOfInterest())
+ .append("->")
+ .append(InterestType.getString(message.getInterestType()));
+ logger.debug(buffer.toString());
+ }
+ } else {
+ // Unregister interest in this region->key
+ if (key instanceof List) {
+ unregisterClientInterest(regionName, (List) key, false);
+ } else {
+ unregisterClientInterest(regionName, key, interestType, false);
+ }
+
+ if (logger.isDebugEnabled()) {
+ StringBuffer buffer = new StringBuffer();
+ buffer
+ .append(this)
+ .append(": Interest listener unregistered interest in ")
+ .append(message.getRegionName())
+ .append("->")
+ .append(message.getKeyOfInterest())
+ .append("->")
+ .append(InterestType.getString(message.getInterestType()));
+ logger.debug(buffer.toString());
+ }
+ }
+ }
+
+ private boolean postDeliverAuthCheckPassed(ClientUpdateMessage clientMessage) {
+ // Before adding it in the queue for dispatching, check for post
+ // process authorization
+ if (AcceptorImpl.isAuthenticationRequired()
+ && this.postAuthzCallback == null
+ && AcceptorImpl.isPostAuthzCallbackPresent()) {
+ // security is on and callback is null: it means multiuser mode.
+ ClientUpdateMessageImpl cumi = (ClientUpdateMessageImpl)clientMessage;
+
+ CqNameToOp clientCq = cumi.getClientCq(this.proxyID);
+
+ if (clientCq != null && !clientCq.isEmpty()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("CCP clientCq size before processing auth {}", clientCq.size());
+ }
+ String[] regionNameHolder = new String[1];
+ OperationContext opctxt = getOperationContext(clientMessage,
+ regionNameHolder);
+ if (opctxt == null) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.CacheClientProxy__0_NOT_ADDING_MESSAGE_TO_QUEUE_1_BECAUSE_THE_OPERATION_CONTEXT_OBJECT_COULD_NOT_BE_OBTAINED_FOR_THIS_CLIENT_MESSAGE,
+ new Object[] {this, clientMessage}));
+ return false;
+ }
+
+ String[] cqNames = clientCq.getNames();
+ if (logger.isDebugEnabled()) {
+ logger.debug("CCP clientCq names array size {}", cqNames.length);
+ }
+ for (int i = 0; i < cqNames.length; i++) {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("CCP clientCq name {}", cqNames[i]);
+ }
+ boolean isAuthorized = false;
+
+ if (this.proxyID.isDurable() && this.getDurableKeepAlive()
+ && this._isPaused) {
+ // need to take lock as we may be reinitializing proxy cache
+ synchronized (this.clientUserAuthsLock) {
+ AuthorizeRequestPP postAuthCallback = this.clientUserAuths
+ .getUserAuthAttributes(cqNames[i]).getPostAuthzRequest();
+ if (logger.isDebugEnabled() && postAuthCallback == null) {
+ logger.debug("CCP clientCq post callback is null");
+ }
+ if (postAuthCallback != null && postAuthCallback
+ .getPostAuthzCallback().authorizeOperation(
+ regionNameHolder[0], opctxt)) {
+ isAuthorized = true;
+ }
+ }
+ } else {
+ UserAuthAttributes userAuthAttributes = this.clientUserAuths
+ .getUserAuthAttributes(cqNames[i]);
+
+ AuthorizeRequestPP postAuthCallback = userAuthAttributes
+ .getPostAuthzRequest();
+ if (postAuthCallback == null && logger.isDebugEnabled()) {
+ logger.debug("CCP clientCq post callback is null");
+ }
+ if (postAuthCallback != null && postAuthCallback
+ .getPostAuthzCallback().authorizeOperation(
+ regionNameHolder[0], opctxt)) {
+ isAuthorized = true;
+ }
+ }
+
+ if (!isAuthorized) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.CacheClientProxy__0_NOT_ADDING_CQ_MESSAGE_TO_QUEUE_1_BECAUSE_AUTHORIZATION_FAILED,
+ new Object[] {this, clientMessage}));
+ clientCq.delete(cqNames[i]);
+ }
+ } catch (Exception ex) {
+ // ignore...
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("CCP clientCq size after processing auth {}", clientCq.size());
+ }
+ }
+ // again need to check as there may be no CQ available
+ if (!clientMessage.hasCqs(this.proxyID)) {
+ this._statistics.incMessagesNotQueuedNotInterested();
+ if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) {
+ logger.debug("{}: Not adding message to queue. It is not interested in this region and key: {}", clientMessage);
+ }
+ return false;
+ }
+ }
+ }
+ else if (this.postAuthzCallback != null) {
+ String[] regionNameHolder = new String[1];
+ boolean isAuthorize = false;
+ OperationContext opctxt = getOperationContext(clientMessage,
+ regionNameHolder);
+ if (opctxt == null) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy__0_NOT_ADDING_MESSAGE_TO_QUEUE_1_BECAUSE_THE_OPERATION_CONTEXT_OBJECT_COULD_NOT_BE_OBTAINED_FOR_THIS_CLIENT_MESSAGE, new Object[] {this, clientMessage}));
+ return false;
+ }
+ if (logger.isTraceEnabled()){
+ logger.trace("{}: Invoking authorizeOperation for message: {}", this, clientMessage);
+ }
+
+ if (this.proxyID.isDurable() && this.getDurableKeepAlive()
+ && this._isPaused) {
+ synchronized (this.clientUserAuthsLock) {
+ isAuthorize = this.postAuthzCallback.authorizeOperation(
+ regionNameHolder[0], opctxt);
+ }
+ } else {
+ isAuthorize = this.postAuthzCallback.authorizeOperation(
+ regionNameHolder[0], opctxt);
+ }
+ if (!isAuthorize) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy__0_NOT_ADDING_MESSAGE_TO_QUEUE_1_BECAUSE_AUTHORIZATION_FAILED, new Object[] {this, clientMessage}));
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Delivers the message to the client representing this client proxy.
+ * @param conflatable
+ */
+ protected void deliverMessage(Conflatable conflatable)
+ {
+ ClientUpdateMessage clientMessage = null;
+ if(conflatable instanceof HAEventWrapper) {
+ clientMessage = ((HAEventWrapper)conflatable).getClientUpdateMessage();
+ } else {
+ clientMessage = (ClientUpdateMessage)conflatable;
+ }
+
+ this._statistics.incMessagesReceived();
+
+ if (clientMessage.needsNoAuthorizationCheck() || postDeliverAuthCheckPassed(clientMessage)) {
+ // If dispatcher is getting initialized, add the event to temporary queue.
+ if (this.messageDispatcherInit) {
+ synchronized (this.queuedEventsSync) {
+ if (this.messageDispatcherInit) { // Check to see value did not changed while getting the synchronize lock.
+ if (logger.isDebugEnabled()) {
+ logger.debug("Message dispatcher for proxy {} is getting initialized. Adding message to the queuedEvents.", this);
+ }
+ this.queuedEvents.add(conflatable);
+ return;
+ }
+ }
+ }
+
+ if (this._messageDispatcher != null) {
+ this._messageDispatcher.enqueueMessage(conflatable);
+ } else {
+ this._statistics.incMessagesFailedQueued();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Message is not added to the queue. Message dispatcher for proxy: {} doesn't exist.", this);
+ }
+ }
+ } else {
+ this._statistics.incMessagesFailedQueued();
+ }
+ }
+
+ protected void sendMessageDirectly(ClientMessage message) {
+ // Send the message directly if the connection exists
+ // (do not go through the queue).
+ if (logger.isDebugEnabled()){
+ logger.debug("About to send message directly to {}", this);
+ }
+ if (this._messageDispatcher != null && this._socket != null && !this._socket.isClosed()) {
+ // If the socket is open, send the message to it
+ this._messageDispatcher.sendMessageDirectly(message);
+ if (logger.isDebugEnabled()){
+ logger.debug("Sent message directly to {}", this);
+ }
+ } else {
+ // Otherwise just reset the ping counter
+ resetPingCounter();
+ if (logger.isDebugEnabled()){
+ logger.debug("Skipped sending message directly to {}", this);
+ }
+ }
+ }
+
+ private OperationContext getOperationContext(ClientMessage cmsg,
+ String[] regionNameHolder) {
+ ClientUpdateMessageImpl cmsgimpl = (ClientUpdateMessageImpl)cmsg;
+ OperationContext opctxt = null;
+ // TODO SW: Special handling for DynamicRegions; this should be reworked
+ // when DynamicRegion API is deprecated
+ String regionName = cmsgimpl.getRegionName();
+ regionNameHolder[0] = regionName;
+ if (cmsgimpl.isCreate()) {
+ if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) {
+ regionNameHolder[0] = (String)cmsgimpl.getKeyOfInterest();
+ opctxt = new RegionCreateOperationContext(true);
+ }
+ else {
+ PutOperationContext tmp = new PutOperationContext(cmsgimpl.getKeyOfInterest(), cmsgimpl
+ .getValue(), cmsgimpl.valueIsObject(), PutOperationContext.CREATE,
+ true);
+ tmp.setCallbackArg(cmsgimpl.getCallbackArgument());
+ opctxt = tmp;
+ }
+ }
+ else if (cmsgimpl.isUpdate()) {
+ if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) {
+ regionNameHolder[0] = (String)cmsgimpl.getKeyOfInterest();
+ opctxt = new RegionCreateOperationContext(true);
+ }
+ else {
+ PutOperationContext tmp = new PutOperationContext(cmsgimpl.getKeyOfInterest(), cmsgimpl
+ .getValue(), cmsgimpl.valueIsObject(), PutOperationContext.UPDATE,
+ true);
+ tmp.setCallbackArg(cmsgimpl.getCallbackArgument());
+ opctxt = tmp;
+ }
+ }
+ else if (cmsgimpl.isDestroy()) {
+ if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) {
+ regionNameHolder[0] = (String)cmsgimpl.getKeyOfInterest();
+ opctxt = new RegionDestroyOperationContext(true);
+ }
+ else {
+ DestroyOperationContext tmp = new DestroyOperationContext(cmsgimpl.getKeyOfInterest(), true);
+ tmp.setCallbackArg(cmsgimpl.getCallbackArgument());
+ opctxt = tmp;
+ }
+ }
+ else if (cmsgimpl.isDestroyRegion()) {
+ opctxt = new RegionDestroyOperationContext(true);
+ }
+ else if (cmsgimpl.isInvalidate()) {
+ InvalidateOperationContext tmp = new InvalidateOperationContext(cmsgimpl.getKeyOfInterest(), true);
+ tmp.setCallbackArg(cmsgimpl.getCallbackArgument());
+ opctxt = tmp;
+ }
+ else if (cmsgimpl.isClearRegion()) {
+ RegionClearOperationContext tmp = new RegionClearOperationContext(true);
+ tmp.setCallbackArg(cmsgimpl.getCallbackArgument());
+ opctxt = tmp;
+ }
+ return opctxt;
+ }
+
+ /**
+ * Initializes the message dispatcher thread. The
+ * <code>MessageDispatcher</code> processes the message queue.
+ *
+ * @throws CacheException
+ */
+ public void initializeMessageDispatcher() throws CacheException
+ {
+ this.messageDispatcherInit = true; // Initialization process.
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Initializing message dispatcher with capacity of {} entries", this, _maximumMessageCount);
+ }
+ String name = "Client Message Dispatcher for "
+ + getProxyID().getDistributedMember() + (isDurable()? " (" + getDurableId()+")" : "");
+ this._messageDispatcher = new MessageDispatcher(this, name);
+
+ //Fix for 41375 - drain as many of the queued events
+ //as we can without synchronization.
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} draining {} events from init queue into intialized queue", this, this.queuedEvents.size());
+ }
+ Conflatable nextEvent;
+ while((nextEvent = queuedEvents.poll()) != null) {
+ this._messageDispatcher.enqueueMessage(nextEvent);
+ }
+
+ //Now finish emptying the queue with synchronization to make
+ //sure we don't miss any events.
+ synchronized (this.queuedEventsSync){
+ while((nextEvent = queuedEvents.poll()) != null) {
+ this._messageDispatcher.enqueueMessage(nextEvent);
+ }
+
+ this.messageDispatcherInit = false; // Done initialization.
+ }
+ } finally {
+ if (this.messageDispatcherInit) { // If its not successfully completed.
+ this._statistics.close();
+ }
+ }
+ }
+
+ protected void startOrResumeMessageDispatcher(boolean processedMarker) {
+ // Only start or resume the dispatcher if it is Primary
+ if (this.isPrimary) {
+ // Add the marker to the queue
+ if (!processedMarker) {
+ EventID eventId = new EventID(this._cache.getDistributedSystem());
+ this._messageDispatcher.enqueueMarker(new ClientMarkerMessageImpl(eventId));
+ }
+
+ // Set the message queue to primary.
+ this._messageDispatcher._messageQueue.setPrimary(true);
+
+ // Start or resume the dispatcher
+ synchronized (this._messageDispatcher._pausedLock) {
+ if (this.isPaused()) {
+ // It is paused, resume it
+ this.setPaused(false);
+ if (this._messageDispatcher.isStopped()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Starting dispatcher", this);
+ }
+ this._messageDispatcher.start();
+ }
+ else {
+ // ARB: Initialize transient fields.
+ this._messageDispatcher.initializeTransients();
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Resuming dispatcher", this);
+ }
+ this._messageDispatcher.resumeDispatching();
+ }
+ } else if (!this._messageDispatcher.isAlive()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Starting dispatcher", this);
+ }
+ this._messageDispatcher.start();
+ }
+ }
+ }
+ }
+
+ /*
+ * Returns whether the client represented by this <code> CacheClientProxy
+ * </code> has registered interest in anything. @return whether the client
+ * represented by this <code> CacheClientProxy </code> has registered interest
+ * in anything
+ */
+ protected boolean hasRegisteredInterested()
+ {
+ return
+ this.cils[RegisterInterestTracker.interestListIndex].hasInterest() ||
+ this.cils[RegisterInterestTracker.durableInterestListIndex].hasInterest();
+ }
+
+ /**
+ * Returns a string representation of the proxy
+ */
+ @Override
+ public String toString()
+ {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append("CacheClientProxy[")
+ // .append("client proxy id=")
+ .append(this.proxyID)
+ // .append("; client host name=")
+ // .append(this._socket.getInetAddress().getCanonicalHostName())
+ // .append("; client host address=")
+ // .append(this._remoteHostAddress)
+ .append("; port=").append(this._socket.getPort())
+ .append("; primary=").append(isPrimary)
+ .append("; version=").append(clientVersion)
+ .append("]");
+ return buffer.toString();
+ }
+
+ public String getState(){
+ StringBuffer buffer = new StringBuffer();
+ buffer.append("CacheClientProxy[")
+ // .append("client proxy id=")
+ .append(this.proxyID)
+ // .append("; client host name=")
+ // .append(this._socket.getInetAddress().getCanonicalHostName())
+ // .append("; client host address=")
+ // .append(this._remoteHostAddress)
+ .append("; port=").append(this._socket.getPort())
+ .append("; primary=").append(isPrimary)
+ .append("; version=").append(clientVersion)
+ .append("; paused=").append(isPaused())
+ .append("; alive=").append(isAlive())
+ .append("; connected=").append(isConnected())
+ .append("; isMarkedForRemoval=").append(isMarkedForRemoval)
+ .append("]");
+ return buffer.toString();
+ }
+
+ public boolean isPrimary()
+ {
+ //boolean primary = this._messageDispatcher.isAlive()
+ // || this._messageDispatcher._messageQueue.isPrimary();
+ boolean primary = this.isPrimary;
+ //System.out.println(this + ": DISPATCHER IS ALIVE: " + this._messageDispatcher.isAlive());
+ //System.out.println(this + ": DISPATCHER QUEUE IS PRIMARY: " + this._messageDispatcher._messageQueue.isPrimary());
+ //System.out.println(this + ": IS PRIMARY: " + primary);
+ return primary;
+ // return this.isPrimary ;
+ }
+
+ protected boolean basicIsPrimary() {
+ return this.isPrimary;
+ }
+
+ protected void setPrimary(boolean isPrimary) {
+ this.isPrimary = isPrimary;
+ }
+
+ // private static int nextId = 0;
+ // static protected int getNextId() {
+ // synchronized (CacheClientProxy.class) {
+ // return ++nextId;
+ // }
+ // }
+ /*
+ * Return this client's HA region queue
+ * @returns - HARegionQueue of the client
+ */
+ public HARegionQueue getHARegionQueue() {
+ if (this._messageDispatcher != null){
+ return _messageDispatcher._messageQueue;
+ }
+ return null;
+ }
+
+
+ /**
+ * Reinitialize a durable <code>CacheClientProxy</code> with a new client.
+ * @param socket
+ * The socket between the server and the client
+ * @param ip
+ * whether this proxy represents the primary
+ */
+ protected void reinitialize(Socket socket, ClientProxyMembershipID proxyId,
+ Cache cache, boolean ip, byte cc, Version ver) {
+ // Re-initialize transient fields
+ initializeTransientFields(socket, proxyId, ip, cc, ver);
+ getCacheClientNotifier().getAcceptorStats().incCurrentQueueConnections();
+
+
+ // Cancel expiration task
+ cancelDurableExpirationTask(true);
+
+ // Set the message dispatcher's primary flag. This could go from primary
+ // to secondary
+ this._messageDispatcher._messageQueue.setPrimary(ip);
+ this._messageDispatcher._messageQueue.setClientConflation(cc);
+
+ reinitializeClientAuths();
+ this.creationDate = new Date();
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Has been reinitialized", this);
+ }
+ }
+
+ protected boolean isDurable() {
+ return getProxyID().isDurable();
+ }
+
+ protected String getDurableId() {
+ return getProxyID().getDurableId();
+ }
+
+ protected int getDurableTimeout() {
+ return getProxyID().getDurableTimeout();
+ }
+
+ private boolean getDurableKeepAlive() {
+ return this.keepalive;
+ }
+
+ protected String getHARegionName() {
+ return getProxyID().getHARegionName();
+ }
+
+ public Region getHARegion() {
+ return this._messageDispatcher._messageQueue.getRegion();
+ }
+
+ public Version getVersion() {
+ return this.clientVersion;
+ }
+
+ protected void scheduleDurableExpirationTask() {
+ SystemTimer.SystemTimerTask task = new SystemTimer.SystemTimerTask() {
+ @Override
+ public void run2() {
+ _durableExpirationTask.compareAndSet(this, null);
+ logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0__THE_EXPIRATION_TASK_HAS_FIRED_SO_THIS_PROXY_IS_BEING_TERMINATED, CacheClientProxy.this));
+ // Remove the proxy from the CacheClientNofier's registry
+ getCacheClientNotifier().removeClientProxy(CacheClientProxy.this);
+ getCacheClientNotifier().durableClientTimedOut(CacheClientProxy.this.proxyID);
+
+ // Close the proxy
+ terminateDispatching(false);
+ _cacheClientNotifier._statistics.incQueueDroppedCount();
+
+ /**
+ * Setting the expiration task to null again and cancelling existing
+ * one, if any. See #50894.
+ * <p/>
+ * The message dispatcher may again set the expiry task in below path:
+ * <code>
+ * com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy.scheduleDurableExpirationTask(CacheClientProxy.java:2020)
+ * com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy.pauseDispatching(CacheClientProxy.java:924)
+ * com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy$MessageDispatcher.pauseOrUnregisterProxy(CacheClientProxy.java:2813)
+ * com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy$MessageDispatcher.run(CacheClientProxy.java:2692)
+ * </code>
+ * <p/>
+ * This is because message dispatcher may get an IOException with
+ * "Proxy closing due to socket being closed locally" during/after
+ * terminateDispatching(false) above.
+ */
+ Object task = _durableExpirationTask.getAndSet(null);
+ if (task != null) {
+ ((SystemTimerTask)task).cancel();
+ }
+ }
+
+ };
+ if(this._durableExpirationTask.compareAndSet(null, task)) {
+ _cache.getCCPTimer().schedule(task,
+ getDurableTimeout()*1000L);
+ }
+ }
+
+ protected void cancelDurableExpirationTask(boolean logMessage) {
+ SystemTimer.SystemTimerTask task = (SystemTimerTask) _durableExpirationTask.getAndSet(null);
+ if (task != null) {
+ if (logMessage) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0_CANCELLING_EXPIRATION_TASK_SINCE_THE_CLIENT_HAS_RECONNECTED, this));
+ }
+ task.cancel();
+ }
+ }
+
+ /**
+ * Class <code>ClientInterestList</code> provides a convenient interface
+ * for manipulating client interest information.
+ */
+ static protected class ClientInterestList
+ {
+
+ final CacheClientProxy ccp;
+
+ final Object id;
+
+ /**
+ * An object used for synchronizing the interest lists
+ */
+ final private Object interestListLock = new Object();
+
+ /**
+ * Regions that this client is interested in
+ */
+ final protected Set<String> regions = new HashSet<String>();
+
+ /**
+ * Constructor.
+ */
+ protected ClientInterestList(CacheClientProxy ccp, Object interestID) {
+ this.ccp = ccp;
+ this.id = interestID;
+ // this.id = getNextId();
+ }
+
+ /**
+ * Registers interest in the input region name and key
+ */
+ protected void registerClientInterest(String regionName,
+ Object keyOfInterest, int interestType, boolean sendUpdatesAsInvalidates)
+ {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: registerClientInterest region={} key={}", ccp, regionName, keyOfInterest);
+ }
+ Set keysRegistered = null;
+ synchronized(this.interestListLock) {
+ LocalRegion r = (LocalRegion)this.ccp._cache.getRegion(regionName, true);
+ if (r == null) {
+ throw new RegionDestroyedException("Region could not be found for interest registration", regionName);
+ }
+ if ( ! (r instanceof CacheDistributionAdvisee) ) {
+ throw new IllegalArgumentException("region " + regionName + " is not distributed and does not support interest registration");
+ }
+ FilterProfile p = r.getFilterProfile();
+ keysRegistered = p.registerClientInterest(id, keyOfInterest, interestType, sendUpdatesAsInvalidates);
+ regions.add(regionName);
+ }
+ // Perform actions if any keys were registered
+ if ((keysRegistered != null) && containsInterestRegistrationListeners()
+ && !keysRegistered.isEmpty()) {
+ handleInterestEvent(regionName, keysRegistered, interestType, true);
+ }
+ }
+
+
+ protected FilterProfile getProfile(String regionName) {
+ try {
+ return this.ccp._cache.getFilterProfile(regionName);
+ } catch (CacheClosedException e) {
+ return null;
+ }
+ }
+
+ /**
+ * Unregisters interest in the input region name and key
+ *
+ * @param regionName
+ * The fully-qualified name of the region in which to unregister
+ * interest
+ * @param keyOfInterest
+ * The key in which to unregister interest
+ */
+ protected void unregisterClientInterest(String regionName,
+ Object keyOfInterest, int interestType)
+ {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: unregisterClientInterest region={} key={}", ccp, regionName, keyOfInterest);
+ }
+ FilterProfile p = getProfile(regionName);
+ Set keysUnregistered = null;
+ synchronized(this.interestListLock) {
+ if (p != null) {
+ keysUnregistered = p.unregisterClientInterest(
+ id, keyOfInterest, interestType);
+ if (!p.hasInterestFor(id)) {
+ this.regions.remove(regionName);
+ }
+ } else {
+ this.regions.remove(regionName);
+ }
+ }
+ if (keysUnregistered != null && !keysUnregistered.isEmpty()) {
+ handleInterestEvent(regionName, keysUnregistered, interestType, false);
+ }
+ }
+
+ /**
+ * Registers interest in the input region name and list of keys
+ *
+ * @param regionName
+ * The fully-qualified name of the region in which to register
+ * interest
+ * @param keysOfInterest
+ * The list of keys in which to register interest
+ */
+ protected void registerClientInterestList(String regionName,
+ List keysOfInterest, boolean sendUpdatesAsInvalidates) {
+ FilterProfile p = getProfile(regionName);
+ if (p == null) {
+ throw new RegionDestroyedException("Region not found during client interest registration", regionName);
+ }
+ Set keysRegistered = null;
+ synchronized(this.interestListLock) {
+ keysRegistered = p.registerClientInterestList(id, keysOfInterest, sendUpdatesAsInvalidates);
+ regions.add(regionName);
+ }
+ // Perform actions if any keys were registered
+ if (containsInterestRegistrationListeners() && !keysRegistered.isEmpty()) {
+ handleInterestEvent(regionName, keysRegistered, InterestType.KEY, true);
+ }
+ }
+
+ /**
+ * Unregisters interest in the input region name and list of keys
+ *
+ * @param regionName
+ * The fully-qualified name of the region in which to unregister
+ * interest
+ * @param keysOfInterest
+ * The list of keys in which to unregister interest
+ */
+ protected void unregisterClientInterestList(String regionName,
+ List keysOfInterest)
+ {
+ FilterProfile p = getProfile(regionName);
+ Set keysUnregistered = null;
+ synchronized(this.interestListLock) {
+ if (p != null) {
+ keysUnregistered = p.unregisterClientInterestList(
+ id, keysOfInterest);
+ if (!p.hasInterestFor(id)) {
+ regions.remove(regionName);
+ }
+ } else {
+ regions.remove(regionName);
+ }
+ }
+ // Perform actions if any keys were unregistered
+ if (!keysUnregistered.isEmpty()) {
+ handleInterestEvent(regionName, keysUnregistered, InterestType.KEY,false);
+ }
+ }
+
+ /*
+ * Returns whether this interest list has any keys, patterns or filters of
+ * interest. It answers the question: Are any clients being notified because
+ * of this interest list? @return whether this interest list has any keys,
+ * patterns or filters of interest
+ */
+ protected boolean hasInterest() {
+ return regions.size() > 0;
+ }
+
+ protected void clearClientInterestList() {
+ boolean isClosed = ccp.getCache().isClosed();
+
+ synchronized(this.interestListLock) {
+ for (String regionName: regions) {
+ FilterProfile p = getProfile(regionName);
+ if (p == null) {
+ continue;
+ }
+ if (!isClosed) {
+ if (p.hasAllKeysInterestFor(id)) {
+ Set allKeys = new HashSet();
+ allKeys.add(".*");
+ allKeys = Collections.unmodifiableSet(allKeys);
+ handleInterestEvent(regionName, allKeys,
+ InterestType.REGULAR_EXPRESSION, false);
+ }
+ Set keysOfInterest = p.getKeysOfInterestFor(id);
+ if (keysOfInterest != null && keysOfInterest.size() > 0) {
+ handleInterestEvent(regionName, keysOfInterest,
+ InterestType.KEY, false);
+ }
+ Map<String,Pattern> patternsOfInterest =
+ p.getPatternsOfInterestFor(id);
+ if (patternsOfInterest != null && patternsOfInterest.size() > 0) {
+ handleInterestEvent(regionName, patternsOfInterest.keySet(),
+ InterestType.REGULAR_EXPRESSION, false);
+ }
+ }
+ p.clearInterestFor(id);
+ }
+ regions.clear();
+ }
+ }
+
+
+ private void handleInterestEvent(String regionName, Set keysOfInterest,
+ int interestType, boolean isRegister) {
+ // Notify the region about this register interest event if:
+ // - the application has requested it
+ // - this is a primary CacheClientProxy (otherwise multiple notifications
+ // may occur)
+ // - it is a key interest type (regex is currently not supported)
+ InterestRegistrationEvent event = null;
+ if (NOTIFY_REGION_ON_INTEREST && this.ccp.isPrimary()
+ && interestType == InterestType.KEY) {
+ event = new InterestRegistrationEventImpl(this.ccp, regionName,
+ keysOfInterest, interestType, isRegister);
+ try {
+ notifyRegionOfInterest(event);
+ }
+ catch (Exception e) {
+ logger.warn(LocalizedStrings.CacheClientProxy_REGION_NOTIFICATION_OF_INTEREST_FAILED, e);
+ }
+ }
+ // Invoke interest registration listeners
+ if (containsInterestRegistrationListeners()) {
+ if (event == null) {
+ event = new InterestRegistrationEventImpl(this.ccp, regionName,
+ keysOfInterest, interestType, isRegister);
+ }
+ notifyInterestRegistrationListeners(event);
+ }
+ }
+
+ private void notifyRegionOfInterest(InterestRegistrationEvent event) {
+ this.ccp.getCacheClientNotifier().handleInterestEvent(event);
+ }
+
+ private void notifyInterestRegistrationListeners(
+ InterestRegistrationEvent event) {
+ this.ccp.getCacheClientNotifier().notifyInterestRegistrationListeners(
+ event);
+ }
+
+ private boolean containsInterestRegistrationListeners() {
+ return this.ccp.getCacheClientNotifier()
+ .containsInterestRegistrationListeners();
+ }
+ }
+
+
+ /**
+ * Class <code>MessageDispatcher</code> is a <code>Thread</code> that
+ * processes messages bound for the client by taking messsages from the
+ * message queue and sending them to the client over the socket.
+ */
+ static class MessageDispatcher extends Thread
+ {
+
+ /**
+ * The queue of messages to be sent to the client
+ */
+ protected final HARegionQueue _messageQueue;
+
+ // /**
+ // * An int used to keep track of the number of messages dropped for logging
+ // * purposes. If greater than zero then a warning has been logged about
+ // * messages being dropped.
+ // */
+ // private int _numberOfMessagesDropped = 0;
+
+ /**
+ * The proxy for which this dispatcher is processing messages
+ */
+ private final CacheClientProxy _proxy;
+
+ // /**
+ // * The conflator faciliates message conflation
+ // */
+ // protected BridgeEventConflator _eventConflator;
+
+ /**
+ * Whether the dispatcher is stopped
+ */
+ private volatile boolean _isStopped = true;
+
+ /**
+ * @guarded.By _pausedLock
+ */
+ //boolean _isPausedDispatcher = false;
+
+ /**
+ * A lock object used to control pausing this dispatcher
+ */
+ protected final Object _pausedLock = new Object();
+
+ /**
+ * An object used to protect when dispatching is being stopped.
+ */
+ private final Object _stopDispatchingLock = new Object();
+
+ private final ReadWriteLock socketLock = new ReentrantReadWriteLock();
+
+ private final Lock socketWriteLock = socketLock.writeLock();
+ // /**
+ // * A boolean verifying whether a warning has already been issued if the
+ // * message queue has reached its capacity.
+ // */
+ // private boolean _messageQueueCapacityReachedWarning = false;
+
+ /**
+ * Constructor.
+ *
+ * @param proxy
+ * The <code>CacheClientProxy</code> for which this dispatcher is
+ * processing messages
+ * @param name thread name for this dispatcher
+
<TRUNCATED>