You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/22 18:51:00 UTC
[03/17] geode git commit: Cleanup CacheClientNotifier
http://git-wip-us.apache.org/repos/asf/geode/blob/eeab2576/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index e79bfbd..4bd4970 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -93,7 +93,6 @@ import org.apache.geode.internal.cache.ClientRegionEventImpl;
import org.apache.geode.internal.cache.ClientServerObserver;
import org.apache.geode.internal.cache.ClientServerObserverHolder;
import org.apache.geode.internal.cache.Conflatable;
-import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
@@ -120,29 +119,158 @@ import org.apache.geode.security.AuthenticationFailedException;
import org.apache.geode.security.AuthenticationRequiredException;
/**
- * Class <code>CacheClientNotifier</code> works on the server and manages client socket connections
- * to clients requesting notification of updates and notifies them when updates occur.
+ * Class {@code CacheClientNotifier} works on the server and manages client socket connections to
+ * clients requesting notification of updates and notifies them when updates occur.
*
* @since GemFire 3.2
*/
-@SuppressWarnings({"synthetic-access", "deprecation"})
public class CacheClientNotifier {
private static final Logger logger = LogService.getLogger();
private static volatile CacheClientNotifier ccnSingleton;
/**
- * Factory method to construct a CacheClientNotifier <code>CacheClientNotifier</code> instance.
+ * The map of known {@code CacheClientProxy} instances. Maps ClientProxyMembershipID to
+ * CacheClientProxy. Note that the keys in this map are not updated when a durable client
+ * reconnects. To make sure you get the updated ClientProxyMembershipID use this map to lookup the
+ * CacheClientProxy and then call getProxyID on it.
+ * <p>
+ * NOTE: package-private to avoid synthetic accessor
+ */
+ final ConcurrentMap/* <ClientProxyMembershipID, CacheClientProxy> */ clientProxies =
+ new ConcurrentHashMap();
+
+ /**
+ * The map of {@code CacheClientProxy} instances which are getting initialized. Maps
+ * ClientProxyMembershipID to CacheClientProxy.
+ */
+ private final ConcurrentMap/* <ClientProxyMembershipID, CacheClientProxy> */ initClientProxies =
+ new ConcurrentHashMap();
+
+ private final Set<ClientProxyMembershipID> timedOutDurableClientProxies = new HashSet<>();
+
+ /**
+ * The GemFire {@code InternalCache}. Note that since this is a singleton class you should not use
+ * a direct reference to cache in CacheClientNotifier code. Instead, you should always use
+ * {@code getCache()}
+ */
+ private InternalCache cache; // TODO: fix synchronization of cache
+
+ private InternalLogWriter logWriter;
+
+ /**
+ * The GemFire security {@code LogWriter}
+ */
+ private InternalLogWriter securityLogWriter;
+
+ /** the maximum number of messages that can be enqueued in a client-queue. */
+ private final int maximumMessageCount;
+
+ /**
+ * the time (in seconds) after which a message in the client queue will expire.
+ */
+ private final int messageTimeToLive;
+
+ /**
+ * A listener which receives notifications about queues that are added or removed
+ */
+ private final ConnectionListener connectionListener;
+
+ private final CacheServerStats acceptorStats;
+
+ /**
+ * haContainer can hold either the name of the client-messages-region (in case of eviction
+ * policies "mem" or "entry") or an instance of HashMap (in case of eviction policy "none"). In
+ * both the cases, it'll store HAEventWrapper as its key and ClientUpdateMessage as its value.
+ */
+ private volatile HAContainerWrapper haContainer;
+
+ /**
+ * The size of the server-to-client communication socket buffers. This can be modified using the
+ * BridgeServer.SOCKET_BUFFER_SIZE system property.
+ */
+ private static final int socketBufferSize =
+ Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", 32768);
+
+ /**
+ * The statistics for this notifier
+ */
+ final CacheClientNotifierStats statistics; // TODO: pass statistics into CacheClientProxy then
+ // make private
+
+ /**
+ * The {@code InterestRegistrationListener} instances registered in this VM. This is used when
+ * modifying the set of listeners.
+ */
+ private final Set writableInterestRegistrationListeners = new CopyOnWriteArraySet();
+
+ /**
+ * The {@code InterestRegistrationListener} instances registered in this VM. This is used to
+ * provide a read-only {@code Set} of listeners.
+ */
+ private final Set readableInterestRegistrationListeners =
+ Collections.unmodifiableSet(this.writableInterestRegistrationListeners);
+
+ /**
+ * System property name for indicating how much frequently the "Queue full" message should be
+ * logged.
+ */
+ private static final String MAX_QUEUE_LOG_FREQUENCY =
+ DistributionConfig.GEMFIRE_PREFIX + "logFrequency.clientQueueReachedMaxLimit";
+
+ public static final long DEFAULT_LOG_FREQUENCY = 1000;
+
+ private static final String EVENT_ENQUEUE_WAIT_TIME_NAME =
+ DistributionConfig.GEMFIRE_PREFIX + "subscription.EVENT_ENQUEUE_WAIT_TIME";
+
+ private static final int DEFAULT_EVENT_ENQUEUE_WAIT_TIME = 100;
+
+ /**
+ * System property value denoting the time in milliseconds. Any thread putting an event into a
+ * subscription queue, which is full, will wait this much time for the queue to make space. It'll
+ * then enque the event possibly causing the queue to grow beyond its capacity/max-size. See
+ * #51400.
+ */
+ public static int eventEnqueueWaitTime; // TODO: encapsulate eventEnqueueWaitTime
+
+ /**
+ * The frequency of logging the "Queue full" message.
+ */
+ private long logFrequency = DEFAULT_LOG_FREQUENCY;
+
+ private final Map<String, DefaultQuery> compiledQueries = new ConcurrentHashMap<>();
+
+ private volatile boolean isCompiledQueryCleanupThreadStarted = false;
+
+ private final Object lockIsCompiledQueryCleanupThreadStarted = new Object();
+
+ private SystemTimer.SystemTimerTask clientPingTask; // TODO: fix synchronization of clientPingTask
+
+ private final SocketCloser socketCloser;
+
+ private static final long CLIENT_PING_TASK_PERIOD =
+ Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingPeriod", 60000);
+
+ /**
+ * package-private to avoid synthetic accessor
+ */
+ static final long CLIENT_PING_TASK_COUNTER =
+ Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingCounter", 3);
+
+ private final Set blackListedClients = new CopyOnWriteArraySet();
+
+ /**
+ * Factory method to construct a CacheClientNotifier {@code CacheClientNotifier} instance.
*
- * @param cache The GemFire <code>InternalCache</code>
- * @return A <code>CacheClientNotifier</code> instance
+ * @param cache The GemFire {@code InternalCache}
+ * @return A {@code CacheClientNotifier} instance
*/
public static synchronized CacheClientNotifier getInstance(InternalCache cache,
CacheServerStats acceptorStats, int maximumMessageCount, int messageTimeToLive,
ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver) {
if (ccnSingleton == null) {
ccnSingleton = new CacheClientNotifier(cache, acceptorStats, maximumMessageCount,
- messageTimeToLive, listener, overflowAttributesList, isGatewayReceiver);
+ messageTimeToLive, listener, isGatewayReceiver);
}
if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) {
@@ -158,20 +286,72 @@ public class CacheClientNotifier {
}
/**
+ * @param cache The GemFire {@code InternalCache}
+ * @param listener a listener which should receive notifications abouts queues being added or
+ * removed.
+ */
+ private CacheClientNotifier(InternalCache cache, CacheServerStats acceptorStats,
+ int maximumMessageCount, int messageTimeToLive, ConnectionListener listener,
+ boolean isGatewayReceiver) {
+ // Set the Cache
+ setCache(cache);
+ this.acceptorStats = acceptorStats;
+ // we only need one thread per client and wait 50ms for close
+ this.socketCloser = new SocketCloser(1, 50);
+
+ // Set the LogWriter
+ this.logWriter = (InternalLogWriter) cache.getLogger();
+
+ this.connectionListener = listener;
+
+ // Set the security LogWriter
+ this.securityLogWriter = (InternalLogWriter) cache.getSecurityLogger();
+
+ this.maximumMessageCount = maximumMessageCount;
+ this.messageTimeToLive = messageTimeToLive;
+
+ // Initialize the statistics
+ StatisticsFactory factory;
+ if (isGatewayReceiver) {
+ factory = new DummyStatisticsFactory();
+ } else {
+ factory = getCache().getDistributedSystem();
+ }
+ this.statistics = new CacheClientNotifierStats(factory);
+
+ try {
+ this.logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY));
+ if (this.logFrequency <= 0) {
+ this.logFrequency = DEFAULT_LOG_FREQUENCY;
+ }
+ } catch (Exception e) {
+ this.logFrequency = DEFAULT_LOG_FREQUENCY;
+ }
+
+ eventEnqueueWaitTime =
+ Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME);
+ if (eventEnqueueWaitTime < 0) {
+ eventEnqueueWaitTime = DEFAULT_EVENT_ENQUEUE_WAIT_TIME;
+ }
+
+ // Schedule task to periodically ping clients.
+ scheduleClientPingTask();
+ }
+
+ /**
* Writes a given message to the output stream
*
- * @param dos the <code>DataOutputStream</code> to use for writing the message
+ * @param dos the {@code DataOutputStream} to use for writing the message
* @param type a byte representing the message type
- * @param p_msg the message to be written; can be null
+ * @param message the message to be written; can be null
*/
- private void writeMessage(DataOutputStream dos, byte type, String p_msg, Version clientVersion)
+ private void writeMessage(DataOutputStream dos, byte type, String message, Version clientVersion)
throws IOException {
- writeMessage(dos, type, p_msg, clientVersion, (byte) 0x00, 0);
+ writeMessage(dos, type, message, clientVersion, (byte) 0x00, 0);
}
- private void writeMessage(DataOutputStream dos, byte type, String p_msg, Version clientVersion,
+ private void writeMessage(DataOutputStream dos, byte type, String message, Version clientVersion,
byte epType, int qSize) throws IOException {
- String msg = p_msg;
// write the message type
dos.writeByte(type);
@@ -181,6 +361,7 @@ public class CacheClientNotifier {
// dummy qSize
dos.writeInt(qSize);
+ String msg = message;
if (msg == null) {
msg = "";
}
@@ -188,10 +369,10 @@ public class CacheClientNotifier {
if (clientVersion != null && clientVersion.compareTo(Version.GFE_61) >= 0) {
// get all the instantiators.
Instantiator[] instantiators = InternalInstantiator.getInstantiators();
- HashMap instantiatorMap = new HashMap();
+ Map instantiatorMap = new HashMap();
if (instantiators != null && instantiators.length > 0) {
for (Instantiator instantiator : instantiators) {
- ArrayList instantiatorAttributes = new ArrayList();
+ List<String> instantiatorAttributes = new ArrayList<>();
instantiatorAttributes.add(instantiator.getClass().toString().substring(6));
instantiatorAttributes.add(instantiator.getInstantiatedClass().toString().substring(6));
instantiatorMap.put(instantiator.getId(), instantiatorAttributes);
@@ -201,15 +382,14 @@ public class CacheClientNotifier {
// get all the dataserializers.
DataSerializer[] dataSerializers = InternalDataSerializer.getSerializers();
- HashMap<Integer, ArrayList<String>> dsToSupportedClasses =
- new HashMap<Integer, ArrayList<String>>();
- HashMap<Integer, String> dataSerializersMap = new HashMap<Integer, String>();
+ Map<Integer, List<String>> dsToSupportedClasses = new HashMap<>();
+ Map<Integer, String> dataSerializersMap = new HashMap<>();
if (dataSerializers != null && dataSerializers.length > 0) {
for (DataSerializer dataSerializer : dataSerializers) {
dataSerializersMap.put(dataSerializer.getId(),
dataSerializer.getClass().toString().substring(6));
if (clientVersion.compareTo(Version.GFE_6516) >= 0) {
- ArrayList<String> supportedClassNames = new ArrayList<String>();
+ List<String> supportedClassNames = new ArrayList<>();
for (Class clazz : dataSerializer.getSupportedClasses()) {
supportedClassNames.add(clazz.getName());
}
@@ -228,7 +408,7 @@ public class CacheClientNotifier {
/**
* Writes an exception message to the socket
*
- * @param dos the <code>DataOutputStream</code> to use for writing the message
+ * @param dos the {@code DataOutputStream} to use for writing the message
* @param type a byte representing the exception type
* @param ex the exception to be written; should not be null
*/
@@ -245,7 +425,7 @@ public class CacheClientNotifier {
public void registerClient(Socket socket, boolean isPrimary, long acceptorId,
boolean notifyBySubscription) throws IOException {
// Since no remote ports were specified in the message, wait for them.
- long startTime = this._statistics.startTime();
+ long startTime = this.statistics.startTime();
DataInputStream dis = new DataInputStream(socket.getInputStream());
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
@@ -261,7 +441,7 @@ public class CacheClientNotifier {
SocketAddress sa = socket.getRemoteSocketAddress();
UnsupportedVersionException uve = e;
if (sa != null) {
- String sInfo = " Client: " + sa.toString() + ".";
+ String sInfo = " Client: " + sa + ".";
uve = new UnsupportedVersionException(e.getMessage() + sInfo);
}
logger.warn(
@@ -272,8 +452,7 @@ public class CacheClientNotifier {
return;
}
- // Read and ignore the reply code. This is used on the client to server
- // handshake.
+ // Read and ignore the reply code. This is used on the client to server handshake.
dis.readByte(); // replyCode
if (Version.GFE_57.compareTo(clientVersion) <= 0) {
@@ -289,7 +468,7 @@ public class CacheClientNotifier {
}
}
- protected void registerGFEClient(DataInputStream dis, DataOutputStream dos, Socket socket,
+ private void registerGFEClient(DataInputStream dis, DataOutputStream dos, Socket socket,
boolean isPrimary, long startTime, Version clientVersion, long acceptorId,
boolean notifyBySubscription) throws IOException {
// Read the ports and throw them away. We no longer need them
@@ -299,9 +478,6 @@ public class CacheClientNotifier {
}
// Read the handshake identifier and convert it to a string member id
ClientProxyMembershipID proxyID = null;
- CacheClientProxy proxy;
- AccessControl authzCallback = null;
- byte clientConflation = HandShake.CONFLATION_DEFAULT;
try {
proxyID = ClientProxyMembershipID.readCanonicalized(dis);
if (getBlacklistedClient().contains(proxyID)) {
@@ -309,13 +485,14 @@ public class CacheClientNotifier {
new Exception("This client is blacklisted by server"), clientVersion);
return;
}
- proxy = getClientProxy(proxyID);
+ CacheClientProxy proxy = getClientProxy(proxyID);
DistributedMember member = proxyID.getDistributedMember();
- DistributedSystem system = this.getCache().getDistributedSystem();
+ DistributedSystem system = getCache().getDistributedSystem();
Properties sysProps = system.getProperties();
String authenticator = sysProps.getProperty(SECURITY_CLIENT_AUTHENTICATOR);
+ byte clientConflation;
if (clientVersion.compareTo(Version.GFE_603) >= 0) {
byte[] overrides = HandShake.extractOverrides(new byte[] {(byte) dis.read()});
clientConflation = overrides[0];
@@ -339,27 +516,23 @@ public class CacheClientNotifier {
Properties credentials = HandShake.readCredentials(dis, dos, system);
if (credentials != null && proxy != null) {
- if (securityLogWriter.fineEnabled()) {
- securityLogWriter
+ if (this.securityLogWriter.fineEnabled()) {
+ this.securityLogWriter
.fine("CacheClientNotifier: verifying credentials for proxyID: " + proxyID);
}
Object subject = HandShake.verifyCredentials(authenticator, credentials,
system.getSecurityProperties(), this.logWriter, this.securityLogWriter, member);
if (subject instanceof Principal) {
Principal principal = (Principal) subject;
- if (securityLogWriter.fineEnabled()) {
- securityLogWriter
+ if (this.securityLogWriter.fineEnabled()) {
+ this.securityLogWriter
.fine("CacheClientNotifier: successfully verified credentials for proxyID: "
+ proxyID + " having principal: " + principal.getName());
}
String postAuthzFactoryName = sysProps.getProperty(SECURITY_CLIENT_ACCESSOR_PP);
- if (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) {
- if (principal == null) {
- securityLogWriter.warning(
- LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_POST_PROCESS_AUTHORIZATION_CALLBACK_ENABLED_BUT_AUTHENTICATION_CALLBACK_0_RETURNED_WITH_NULL_CREDENTIALS_FOR_PROXYID_1,
- new Object[] {SECURITY_CLIENT_AUTHENTICATOR, proxyID});
- }
+ AccessControl authzCallback = null;
+ if (postAuthzFactoryName != null && !postAuthzFactoryName.isEmpty()) {
Method authzMethod = ClassLoadUtil.methodFromName(postAuthzFactoryName);
authzCallback = (AccessControl) authzMethod.invoke(null, (Object[]) null);
authzCallback.init(principal, member, this.getCache());
@@ -374,13 +547,13 @@ public class CacheClientNotifier {
LocalizedStrings.CacheClientNotifier_CLIENTPROXYMEMBERSHIPID_OBJECT_COULD_NOT_BE_CREATED_EXCEPTION_OCCURRED_WAS_0
.toLocalizedString(e));
} catch (AuthenticationRequiredException ex) {
- securityLogWriter.warning(
+ this.securityLogWriter.warning(
LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1,
new Object[] {proxyID, ex});
writeException(dos, HandShake.REPLY_EXCEPTION_AUTHENTICATION_REQUIRED, ex, clientVersion);
return;
} catch (AuthenticationFailedException ex) {
- securityLogWriter.warning(
+ this.securityLogWriter.warning(
LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1,
new Object[] {proxyID, ex});
writeException(dos, HandShake.REPLY_EXCEPTION_AUTHENTICATION_FAILED, ex, clientVersion);
@@ -389,11 +562,10 @@ public class CacheClientNotifier {
logger.warn(LocalizedMessage.create(
LocalizedStrings.CacheClientNotifier_0_REGISTERCLIENT_EXCEPTION_ENCOUNTERED_IN_REGISTRATION_1,
new Object[] {this, e}), e);
- IOException io = new IOException(
+ throw new IOException(
LocalizedStrings.CacheClientNotifier_EXCEPTION_OCCURRED_WHILE_TRYING_TO_REGISTER_INTEREST_DUE_TO_0
- .toLocalizedString(e.getMessage()));
- io.initCause(e);
- throw io;
+ .toLocalizedString(e.getMessage()),
+ e);
} catch (Exception ex) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1,
@@ -402,7 +574,7 @@ public class CacheClientNotifier {
return;
}
- this._statistics.endClientRegistration(startTime);
+ this.statistics.endClientRegistration(startTime);
}
/**
@@ -410,14 +582,13 @@ public class CacheClientNotifier {
*
* @param socket The socket over which the server communicates with the client.
* @param proxyId The distributed member id of the client being registered
- * @param proxy The <code>CacheClientProxy</code> of the given <code>proxyId</code>
+ * @param proxy The {@code CacheClientProxy} of the given {@code proxyId}
*
* @return CacheClientProxy for the registered client
*/
private CacheClientProxy registerClient(Socket socket, ClientProxyMembershipID proxyId,
CacheClientProxy proxy, boolean isPrimary, byte clientConflation, Version clientVersion,
long acceptorId, boolean notifyBySubscription) throws IOException, CacheException {
- CacheClientProxy l_proxy = proxy;
// Initialize the socket
socket.setTcpNoDelay(true);
@@ -431,9 +602,6 @@ public class CacheClientNotifier {
}
// Determine whether the client is durable or not.
- byte responseByte = Acceptor.SUCCESSFUL_SERVER_TO_CLIENT;
- String unsuccessfulMsg = null;
- boolean successful = true;
boolean clientIsDurable = proxyId.isDurable();
if (logger.isDebugEnabled()) {
if (clientIsDurable) {
@@ -446,8 +614,11 @@ public class CacheClientNotifier {
byte epType = 0x00;
int qSize = 0;
+ byte responseByte = Acceptor.SUCCESSFUL_SERVER_TO_CLIENT;
+ String unsuccessfulMsg = null;
+ boolean successful = true;
if (clientIsDurable) {
- if (l_proxy == null) {
+ if (proxy == null) {
if (isTimedOut(proxyId)) {
qSize = PoolImpl.PRIMARY_QUEUE_TIMED_OUT;
} else {
@@ -459,9 +630,9 @@ public class CacheClientNotifier {
"CacheClientNotifier: No proxy exists for durable client with id {}. It must be created.",
proxyId.getDurableId());
}
- l_proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation,
+ proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation,
clientVersion, acceptorId, notifyBySubscription);
- successful = this.initializeProxy(l_proxy);
+ successful = this.initializeProxy(proxy);
} else {
if (proxy.isPrimary()) {
epType = (byte) 2;
@@ -470,27 +641,27 @@ public class CacheClientNotifier {
}
qSize = proxy.getQueueSize();
// A proxy exists for this durable client. It must be reinitialized.
- if (l_proxy.isPaused()) {
+ if (proxy.isPaused()) {
if (CacheClientProxy.testHook != null) {
CacheClientProxy.testHook.doTestHook("CLIENT_PRE_RECONNECT");
}
- if (l_proxy.lockDrain()) {
+ if (proxy.lockDrain()) {
try {
if (logger.isDebugEnabled()) {
logger.debug(
"CacheClientNotifier: A proxy exists for durable client with id {}. This proxy will be reinitialized: {}",
- proxyId.getDurableId(), l_proxy);
+ proxyId.getDurableId(), proxy);
}
- this._statistics.incDurableReconnectionCount();
- l_proxy.getProxyID().updateDurableTimeout(proxyId.getDurableTimeout());
- l_proxy.reinitialize(socket, proxyId, this.getCache(), isPrimary, clientConflation,
+ this.statistics.incDurableReconnectionCount();
+ proxy.getProxyID().updateDurableTimeout(proxyId.getDurableTimeout());
+ proxy.reinitialize(socket, proxyId, this.getCache(), isPrimary, clientConflation,
clientVersion);
- l_proxy.setMarkerEnqueued(true);
+ proxy.setMarkerEnqueued(true);
if (CacheClientProxy.testHook != null) {
CacheClientProxy.testHook.doTestHook("CLIENT_RECONNECTED");
}
} finally {
- l_proxy.unlockDrain();
+ proxy.unlockDrain();
}
} else {
unsuccessfulMsg =
@@ -507,7 +678,7 @@ public class CacheClientNotifier {
// client is already using this durable id.
unsuccessfulMsg =
LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_THE_REQUESTED_DURABLE_CLIENT_HAS_THE_SAME_IDENTIFIER__0__AS_AN_EXISTING_DURABLE_CLIENT__1__DUPLICATE_DURABLE_CLIENTS_ARE_NOT_ALLOWED
- .toLocalizedString(new Object[] {proxyId.getDurableId(), proxy});
+ .toLocalizedString(proxyId.getDurableId(), proxy);
logger.warn(unsuccessfulMsg);
// Set the unsuccessful response byte.
responseByte = HandShake.REPLY_EXCEPTION_DUPLICATE_DURABLE_CLIENT;
@@ -537,18 +708,18 @@ public class CacheClientNotifier {
if (toCreateNewProxy) {
// Create the new proxy for this non-durable client
- l_proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation,
+ proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation,
clientVersion, acceptorId, notifyBySubscription);
- successful = this.initializeProxy(l_proxy);
+ successful = this.initializeProxy(proxy);
}
}
if (!successful) {
- l_proxy = null;
+ proxy = null;
responseByte = HandShake.REPLY_REFUSED;
unsuccessfulMsg =
LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_A_PREVIOUS_CONNECTION_ATTEMPT_FROM_THIS_CLIENT_IS_STILL_BEING_PROCESSED__0
- .toLocalizedString(new Object[] {proxyId});
+ .toLocalizedString(proxyId);
}
// Tell the client that the proxy has been registered using the response
@@ -562,10 +733,10 @@ public class CacheClientNotifier {
// write the message type, message length and the error message (if any)
writeMessage(dos, responseByte, unsuccessfulMsg, clientVersion, epType, qSize);
} catch (IOException ioe) {// remove the added proxy if we get IOException.
- if (l_proxy != null) {
- boolean keepProxy = l_proxy.close(false, false); // do not check for queue, just close it
+ if (proxy != null) {
+ boolean keepProxy = proxy.close(false, false); // do not check for queue, just close it
if (!keepProxy) {
- removeClientProxy(l_proxy);
+ removeClientProxy(proxy);
}
}
throw ioe;
@@ -580,41 +751,39 @@ public class CacheClientNotifier {
// will ensure that the response byte is sent to the client before
// the marker message. If the client is durable, the message processor
// is not started until the clientReady message is received.
- if (!clientIsDurable && l_proxy != null
- && responseByte == Acceptor.SUCCESSFUL_SERVER_TO_CLIENT) {
+ if (!clientIsDurable && proxy != null && responseByte == Acceptor.SUCCESSFUL_SERVER_TO_CLIENT) {
// The startOrResumeMessageDispatcher tests if the proxy is a primary.
// If this is a secondary proxy, the dispatcher is not started.
// The false parameter signifies that a marker message has not already been
// processed. This will generate and send one.
- l_proxy.startOrResumeMessageDispatcher(false);
+ proxy.startOrResumeMessageDispatcher(false);
}
if (responseByte == Acceptor.SUCCESSFUL_SERVER_TO_CLIENT) {
if (logger.isDebugEnabled()) {
- logger.debug("CacheClientNotifier: Successfully registered {}", l_proxy);
+ logger.debug("CacheClientNotifier: Successfully registered {}", proxy);
}
} else {
logger.warn(LocalizedMessage.create(
LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_UNSUCCESSFULLY_REGISTERED_CLIENT_WITH_IDENTIFIER__0,
proxyId));
}
- return l_proxy;
+ return proxy;
}
- private boolean initializeProxy(CacheClientProxy l_proxy) throws IOException, CacheException {
- boolean status = false;
- if (!this.isProxyInInitializationMode(l_proxy)) {
+ private boolean initializeProxy(CacheClientProxy proxy) throws CacheException {
+ if (!this.isProxyInInitializationMode(proxy)) {
if (logger.isDebugEnabled()) {
- logger.debug("Initializing proxy: {}", l_proxy);
+ logger.debug("Initializing proxy: {}", proxy);
}
try {
// Add client proxy to initialization list. This has to be done before
// the queue is created so that events can be buffered here for delivery
// to the queue once it's initialized (bug #41681 and others)
- addClientInitProxy(l_proxy);
- l_proxy.initializeMessageDispatcher();
+ addClientInitProxy(proxy);
+ proxy.initializeMessageDispatcher();
// Initialization success. Add to client proxy list.
- addClientProxy(l_proxy);
+ addClientProxy(proxy);
return true;
} catch (RegionExistsException ree) {
if (logger.isDebugEnabled()) {
@@ -624,10 +793,10 @@ public class CacheClientNotifier {
}
// This will return false;
} finally {
- removeClientInitProxy(l_proxy);
+ removeClientInitProxy(proxy);
}
}
- return status;
+ return false;
}
/**
@@ -670,9 +839,9 @@ public class CacheClientNotifier {
boolean success = false;
CacheClientProxy proxy = getClientProxy(proxyId);
if (proxy != null) {
- HARegionQueue harq = proxy.getHARegionQueue();
- harq.addDispatchedMessage(new ThreadIdentifier(eid.getMembershipID(), eid.getThreadID()),
- eid.getSequenceID());
+ HARegionQueue haRegionQueue = proxy.getHARegionQueue();
+ haRegionQueue.addDispatchedMessage(
+ new ThreadIdentifier(eid.getMembershipID(), eid.getThreadID()), eid.getSequenceID());
success = true;
}
return success;
@@ -690,11 +859,6 @@ public class CacheClientNotifier {
}
CacheClientProxy proxy = getClientProxy(membershipID);
if (proxy != null) {
- // Close the port if the proxy represents the client and contains the
- // port)
- // // If so, remove the port from the client's remote ports
- // proxy.removePort(clientPort);
- // Set the keepalive flag
proxy.setKeepAlive(keepAlive);
}
}
@@ -704,7 +868,7 @@ public class CacheClientNotifier {
*
* @param memberId Uniquely identifies the client
*/
- public void unregisterClient(ClientProxyMembershipID memberId, boolean normalShutdown) {
+ void unregisterClient(ClientProxyMembershipID memberId, boolean normalShutdown) {
if (logger.isDebugEnabled()) {
logger.debug("CacheClientNotifier: Unregistering all clients with member id: {}", memberId);
}
@@ -769,14 +933,16 @@ public class CacheClientNotifier {
* notify interested clients of the given cache event using the given update message. The event
* should have routing information in it that determines which clients will receive the event.
*/
- public static void notifyClients(InternalCacheEvent event, ClientUpdateMessage cmsg) {
+ public static void notifyClients(InternalCacheEvent event,
+ ClientUpdateMessage clientUpdateMessage) {
CacheClientNotifier instance = ccnSingleton;
if (instance != null) {
- instance.singletonNotifyClients(event, cmsg);
+ instance.singletonNotifyClients(event, clientUpdateMessage);
}
}
- private void singletonNotifyClients(InternalCacheEvent event, ClientUpdateMessage cmsg) {
+ private void singletonNotifyClients(InternalCacheEvent event,
+ ClientUpdateMessage clientUpdateMessage) {
final boolean isDebugEnabled = logger.isDebugEnabled();
final boolean isTraceEnabled = logger.isTraceEnabled();
@@ -796,20 +962,20 @@ public class CacheClientNotifier {
return;
}
- long startTime = this._statistics.startTime();
+ long startTime = this.statistics.startTime();
ClientUpdateMessageImpl clientMessage;
- if (cmsg == null) {
+ if (clientUpdateMessage == null) {
clientMessage = constructClientMessage(event);
} else {
- clientMessage = (ClientUpdateMessageImpl) cmsg;
+ clientMessage = (ClientUpdateMessageImpl) clientUpdateMessage;
}
if (clientMessage == null) {
return;
}
// Holds the clientIds to which filter message needs to be sent.
- Set<ClientProxyMembershipID> filterClients = new HashSet();
+ Set<ClientProxyMembershipID> filterClients = new HashSet<>();
// Add CQ info.
if (filterInfo.getCQs() != null) {
@@ -868,7 +1034,7 @@ public class CacheClientNotifier {
}
}
- Conflatable conflatable = null;
+ Conflatable conflatable;
if (clientMessage instanceof ClientTombstoneMessage) {
// bug #46832 - HAEventWrapper deserialization can't handle subclasses
@@ -893,7 +1059,7 @@ public class CacheClientNotifier {
singletonRouteClientMessage(conflatable, filterClients);
- this._statistics.endEvent(startTime);
+ this.statistics.endEvent(startTime);
// Cleanup destroyed events in CQ result cache.
// While maintaining the CQ results key caching. the destroy event
@@ -915,7 +1081,7 @@ public class CacheClientNotifier {
String cqName = regionProfile.getRealCqID(cqID);
if (cqName != null) {
ServerCQ cq = regionProfile.getCq(cqName);
- if (cq != null && e.getValue().equals(Integer.valueOf(MessageType.LOCAL_DESTROY))) {
+ if (cq != null && e.getValue().equals(MessageType.LOCAL_DESTROY)) {
cq.removeFromCqResultKeys(entryEvent.getKey(), true);
}
}
@@ -930,28 +1096,15 @@ public class CacheClientNotifier {
public static void routeClientMessage(Conflatable clientMessage) {
CacheClientNotifier instance = ccnSingleton;
if (instance != null) {
- instance.singletonRouteClientMessage(clientMessage, instance._clientProxies.keySet()); // ok
- // to
- // use
- // keySet
- // here
- // because
- // all
- // we
- // do
- // is
- // call
- // getClientProxy
- // with
- // these
- // keys
+ // ok to use keySet here because all we do is call getClientProxy with these keys
+ instance.singletonRouteClientMessage(clientMessage, instance.clientProxies.keySet());
}
}
/**
* this is for server side registration of client queue
*/
- public static void routeSingleClientMessage(ClientUpdateMessage clientMessage,
+ static void routeSingleClientMessage(ClientUpdateMessage clientMessage,
ClientProxyMembershipID clientProxyMembershipId) {
CacheClientNotifier instance = ccnSingleton;
if (instance != null) {
@@ -963,27 +1116,25 @@ public class CacheClientNotifier {
private void singletonRouteClientMessage(Conflatable conflatable,
Collection<ClientProxyMembershipID> filterClients) {
- this._cache.getCancelCriterion().checkCancelInProgress(null); // bug #43942 - client notified
- // but no p2p distribution
+ this.cache.getCancelCriterion().checkCancelInProgress(null);
List<CacheClientProxy> deadProxies = null;
for (ClientProxyMembershipID clientId : filterClients) {
- CacheClientProxy proxy;
- proxy = this.getClientProxy(clientId, true);
+ CacheClientProxy proxy = this.getClientProxy(clientId, true);
if (proxy != null) {
if (proxy.isAlive() || proxy.isPaused() || proxy.isConnected() || proxy.isDurable()) {
proxy.deliverMessage(conflatable);
} else {
proxy.getStatistics().incMessagesFailedQueued();
if (deadProxies == null) {
- deadProxies = new ArrayList<CacheClientProxy>();
+ deadProxies = new ArrayList<>();
}
deadProxies.add(proxy);
}
- this.blackListSlowReciever(proxy);
+ this.blackListSlowReceiver(proxy);
}
}
- checkAndRemoveFromClientMsgsRegion(conflatable);
+ checkAndRemoveFromClientMessagesRegion(conflatable);
// Remove any dead clients from the clients to notify
if (deadProxies != null) {
closeDeadProxies(deadProxies, false);
@@ -994,7 +1145,7 @@ public class CacheClientNotifier {
* processes the given collection of durable and non-durable client identifiers, returning a
* collection of non-durable identifiers of clients connected to this VM
*/
- public Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs) {
+ Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs) {
return getProxyIDs(mixedDurableAndNonDurableIDs, false);
}
@@ -1003,52 +1154,44 @@ public class CacheClientNotifier {
* collection of non-durable identifiers of clients connected to this VM. This version can check
* for proxies in initialization as well as fully initialized proxies.
*/
- public Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs,
+ private Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs,
boolean proxyInInitMode) {
- Set<ClientProxyMembershipID> result = new HashSet();
+ Set<ClientProxyMembershipID> result = new HashSet<>();
for (Object id : mixedDurableAndNonDurableIDs) {
if (id instanceof String) {
CacheClientProxy clientProxy = getClientProxy((String) id, true);
if (clientProxy != null) {
result.add(clientProxy.getProxyID());
}
- // else { we don't have a proxy for the given durable ID }
} else {
// try to canonicalize the ID.
CacheClientProxy proxy = getClientProxy((ClientProxyMembershipID) id, true);
if (proxy != null) {
- // this._logger.info(LocalizedStrings.DEBUG, "BRUCE: found match for " + id + ": " +
- // proxy.getProxyID());
result.add(proxy.getProxyID());
- } else {
- // this._logger.info(LocalizedStrings.DEBUG, "BRUCE: did not find match for " + id);
- // this was causing OOMEs in HARegion initial image processing because
- // messages had routing for clients unknown to this server
- // result.add((ClientProxyMembershipID)id);
}
}
}
return result;
}
- private void blackListSlowReciever(CacheClientProxy clientProxy) {
+ private void blackListSlowReceiver(CacheClientProxy clientProxy) {
final CacheClientProxy proxy = clientProxy;
- if ((proxy.getHARegionQueue() != null && proxy.getHARegionQueue().isClientSlowReciever())
- && !blackListedClients.contains(proxy.getProxyID())) {
+ if (proxy.getHARegionQueue() != null && proxy.getHARegionQueue().isClientSlowReciever()
+ && !this.blackListedClients.contains(proxy.getProxyID())) {
// log alert with client info.
logger.warn(
LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_CLIENT_0_IS_A_SLOW_RECEIVER,
new Object[] {proxy.getProxyID()}));
addToBlacklistedClient(proxy.getProxyID());
- InternalDistributedSystem ids =
- (InternalDistributedSystem) this.getCache().getDistributedSystem();
- final DM dm = ids.getDistributionManager();
+ InternalDistributedSystem system = getCache().getInternalDistributedSystem();
+ final DM dm = system.getDistributionManager();
+
dm.getWaitingThreadPool().execute(new Runnable() {
+ @Override
public void run() {
CacheDistributionAdvisor advisor =
- ((DistributedRegion) proxy.getHARegionQueue().getRegion())
- .getCacheDistributionAdvisor();
+ proxy.getHARegionQueue().getRegion().getCacheDistributionAdvisor();
Set members = advisor.adviseCacheOp();
// Send client blacklist message
@@ -1074,25 +1217,24 @@ public class CacheClientNotifier {
}
/**
- * Initializes a <code>ClientUpdateMessage</code> from an operation and event
+ * Initializes a {@code ClientUpdateMessage} from an operation and event
*
* @param operation The operation that occurred (e.g. AFTER_CREATE)
* @param event The event containing the data to be updated
- * @return a <code>ClientUpdateMessage</code>
+ * @return a {@code ClientUpdateMessage}
*/
- private ClientUpdateMessageImpl initializeMessage(EnumListenerEvent operation, CacheEvent event)
- throws Exception {
+ private ClientUpdateMessageImpl initializeMessage(EnumListenerEvent operation, CacheEvent event) {
if (!supportsOperation(operation)) {
- throw new Exception(
+ throw new UnsupportedOperationException(
LocalizedStrings.CacheClientNotifier_THE_CACHE_CLIENT_NOTIFIER_DOES_NOT_SUPPORT_OPERATIONS_OF_TYPE_0
.toLocalizedString(operation));
}
- // String regionName = event.getRegion().getFullPath();
+
Object keyOfInterest = null;
final EventID eventIdentifier;
ClientProxyMembershipID membershipID = null;
boolean isNetLoad = false;
- Object callbackArgument = null;
+ Object callbackArgument;
byte[] delta = null;
VersionTag versionTag = null;
@@ -1129,19 +1271,19 @@ public class CacheClientNotifier {
}
if (isNetLoad) {
- clientUpdateMsg.setIsNetLoad(isNetLoad);
+ clientUpdateMsg.setIsNetLoad(true);
}
return clientUpdateMsg;
}
/**
- * Returns whether the <code>CacheClientNotifier</code> supports the input operation.
+ * Returns whether the {@code CacheClientNotifier} supports the input operation.
*
* @param operation The operation that occurred (e.g. AFTER_CREATE)
- * @return whether the <code>CacheClientNotifier</code> supports the input operation
+ * @return whether the {@code CacheClientNotifier} supports the input operation
*/
- protected boolean supportsOperation(EnumListenerEvent operation) {
+ private boolean supportsOperation(EnumListenerEvent operation) {
return operation == EnumListenerEvent.AFTER_CREATE
|| operation == EnumListenerEvent.AFTER_UPDATE
|| operation == EnumListenerEvent.AFTER_DESTROY
@@ -1211,7 +1353,7 @@ public class CacheClientNotifier {
int regionDataPolicy) {
if (regionDataPolicy == 0) {
if (!regionsWithEmptyDataPolicy.containsKey(regionName)) {
- regionsWithEmptyDataPolicy.put(regionName, Integer.valueOf(0));
+ regionsWithEmptyDataPolicy.put(regionName, 0);
}
}
}
@@ -1222,8 +1364,8 @@ public class CacheClientNotifier {
* @param regionName The name of the region of interest
* @param keyOfInterest The name of the key of interest
* @param isClosing Whether the caller is closing
- * @param membershipID The <code>ClientProxyMembershipID</code> of the client no longer interested
- * in this <code>Region</code> and key
+ * @param membershipID The {@code ClientProxyMembershipID} of the client no longer interested in
+ * this {@code Region} and key
*/
public void unregisterClientInterest(String regionName, Object keyOfInterest, int interestType,
boolean isClosing, ClientProxyMembershipID membershipID, boolean keepalive) {
@@ -1244,8 +1386,8 @@ public class CacheClientNotifier {
*
* @param regionName The name of the region of interest
* @param keysOfInterest The list of keys of interest
- * @param membershipID The <code>ClientProxyMembershipID</code> of the client no longer interested
- * in this <code>Region</code> and key
+ * @param membershipID The {@code ClientProxyMembershipID} of the client no longer interested in
+ * this {@code Region} and key
*/
public void registerClientInterest(String regionName, List keysOfInterest,
ClientProxyMembershipID membershipID, boolean isDurable, boolean sendUpdatesAsInvalidates,
@@ -1278,8 +1420,8 @@ public class CacheClientNotifier {
* @param regionName The name of the region of interest
* @param keysOfInterest The list of keys of interest
* @param isClosing Whether the caller is closing
- * @param membershipID The <code>ClientProxyMembershipID</code> of the client no longer interested
- * in this <code>Region</code> and key
+ * @param membershipID The {@code ClientProxyMembershipID} of the client no longer interested in
+ * this {@code Region} and key
*/
public void unregisterClientInterest(String regionName, List keysOfInterest, boolean isClosing,
ClientProxyMembershipID membershipID, boolean keepalive) {
@@ -1301,21 +1443,22 @@ public class CacheClientNotifier {
*
* @since GemFire 5.7
*/
- private void checkAndRemoveFromClientMsgsRegion(Conflatable conflatable) {
- if (haContainer == null) {
+ private void checkAndRemoveFromClientMessagesRegion(Conflatable conflatable) {
+ if (this.haContainer == null) {
return;
}
+
if (conflatable instanceof HAEventWrapper) {
HAEventWrapper wrapper = (HAEventWrapper) conflatable;
if (!wrapper.getIsRefFromHAContainer()) {
- wrapper = (HAEventWrapper) haContainer.getKey(wrapper);
+ wrapper = (HAEventWrapper) this.haContainer.getKey(wrapper);
if (wrapper != null && !wrapper.getPutInProgress()) {
synchronized (wrapper) {
if (wrapper.getReferenceCount() == 0L) {
if (logger.isDebugEnabled()) {
logger.debug("Removing event from haContainer: {}", wrapper);
}
- haContainer.remove(wrapper);
+ this.haContainer.remove(wrapper);
}
}
}
@@ -1328,7 +1471,7 @@ public class CacheClientNotifier {
if (logger.isDebugEnabled()) {
logger.debug("Removing event from haContainer: {}", wrapper);
}
- haContainer.remove(wrapper);
+ this.haContainer.remove(wrapper);
}
}
}
@@ -1336,12 +1479,12 @@ public class CacheClientNotifier {
}
/**
- * Returns the <code>CacheClientProxy</code> associated to the membershipID *
+ * Returns the {@code CacheClientProxy} associated to the membershipID *
*
- * @return the <code>CacheClientProxy</code> associated to the membershipID
+ * @return the {@code CacheClientProxy} associated to the membershipID
*/
public CacheClientProxy getClientProxy(ClientProxyMembershipID membershipID) {
- return (CacheClientProxy) this._clientProxies.get(membershipID);
+ return (CacheClientProxy) this.clientProxies.get(membershipID);
}
/**
@@ -1352,25 +1495,25 @@ public class CacheClientNotifier {
boolean proxyInInitMode) {
CacheClientProxy proxy = getClientProxy(membershipID);
if (proxyInInitMode && proxy == null) {
- proxy = (CacheClientProxy) this._initClientProxies.get(membershipID);
+ proxy = (CacheClientProxy) this.initClientProxies.get(membershipID);
}
return proxy;
}
/**
- * Returns the <code>CacheClientProxy</code> associated to the durableClientId
+ * Returns the {@code CacheClientProxy} associated to the durableClientId
*
- * @return the <code>CacheClientProxy</code> associated to the durableClientId
+ * @return the {@code CacheClientProxy} associated to the durableClientId
*/
public CacheClientProxy getClientProxy(String durableClientId) {
return getClientProxy(durableClientId, false);
}
/**
- * Returns the <code>CacheClientProxy</code> associated to the durableClientId. This version of
- * the method can check for initializing proxies as well as fully initialized proxies.
+ * Returns the {@code CacheClientProxy} associated to the durableClientId. This version of the
+ * method can check for initializing proxies as well as fully initialized proxies.
*
- * @return the <code>CacheClientProxy</code> associated to the durableClientId
+ * @return the {@code CacheClientProxy} associated to the durableClientId
*/
public CacheClientProxy getClientProxy(String durableClientId, boolean proxyInInitMode) {
final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -1379,9 +1522,9 @@ public class CacheClientNotifier {
if (isDebugEnabled) {
logger.debug("CacheClientNotifier: Determining client for {}", durableClientId);
}
+
CacheClientProxy proxy = null;
- for (Iterator i = getClientProxies().iterator(); i.hasNext();) {
- CacheClientProxy clientProxy = (CacheClientProxy) i.next();
+ for (CacheClientProxy clientProxy : getClientProxies()) {
if (isTraceEnabled) {
logger.trace("CacheClientNotifier: Checking client {}", clientProxy);
}
@@ -1394,9 +1537,10 @@ public class CacheClientNotifier {
break;
}
}
+
if (proxy == null && proxyInInitMode) {
- for (Iterator i = this._initClientProxies.values().iterator(); i.hasNext();) {
- CacheClientProxy clientProxy = (CacheClientProxy) i.next();
+ for (Object clientProxyObject : this.initClientProxies.values()) {
+ CacheClientProxy clientProxy = (CacheClientProxy) clientProxyObject;
if (isTraceEnabled) {
logger.trace("CacheClientNotifier: Checking initializing client {}", clientProxy);
}
@@ -1415,37 +1559,6 @@ public class CacheClientNotifier {
}
/**
- * Returns the <code>CacheClientProxySameDS</code> associated to the membershipID *
- *
- * @return the <code>CacheClientProxy</code> associated to the same distributed system
- */
- public CacheClientProxy getClientProxySameDS(ClientProxyMembershipID membershipID) {
- final boolean isDebugEnabled = logger.isDebugEnabled();
- if (isDebugEnabled) {
- logger.debug("{}::getClientProxySameDS(), Determining client for host {}", this,
- membershipID);
- logger.debug("{}::getClientProxySameDS(), Number of proxies in the Cache Clinet Notifier: {}",
- this, getClientProxies().size());
- }
- CacheClientProxy proxy = null;
- for (Iterator i = getClientProxies().iterator(); i.hasNext();) {
- CacheClientProxy clientProxy = (CacheClientProxy) i.next();
- if (isDebugEnabled) {
- logger.debug("CacheClientNotifier: Checking client {}", clientProxy);
- }
- if (clientProxy.isSameDSMember(membershipID)) {
- proxy = clientProxy;
- if (isDebugEnabled) {
- logger.debug("CacheClientNotifier: {} represents the client running on host {}", proxy,
- membershipID);
- }
- break;
- }
- }
- return proxy;
- }
-
- /**
* It will remove the clients connected to the passed acceptorId. If its the only server, shuts
* down this instance.
*/
@@ -1453,10 +1566,10 @@ public class CacheClientNotifier {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("At cache server shutdown time, the number of cache servers in the cache is {}",
- this.getCache().getCacheServers().size());
+ getCache().getCacheServers().size());
}
- Iterator it = this._clientProxies.values().iterator();
+ Iterator it = this.clientProxies.values().iterator();
// Close all the client proxies
while (it.hasNext()) {
CacheClientProxy proxy = (CacheClientProxy) it.next();
@@ -1478,27 +1591,27 @@ public class CacheClientNotifier {
if (noActiveServer() && ccnSingleton != null) {
ccnSingleton = null;
- if (haContainer != null) {
- haContainer.cleanUp();
+ if (this.haContainer != null) {
+ this.haContainer.cleanUp();
if (isDebugEnabled) {
- logger.debug("haContainer ({}) is now cleaned up.", haContainer.getName());
+ logger.debug("haContainer ({}) is now cleaned up.", this.haContainer.getName());
}
}
this.clearCompiledQueries();
- blackListedClients.clear();
+ this.blackListedClients.clear();
// cancel the ping task
this.clientPingTask.cancel();
// Close the statistics
- this._statistics.close();
+ this.statistics.close();
this.socketCloser.close();
}
}
private boolean noActiveServer() {
- for (CacheServer server : this.getCache().getCacheServers()) {
+ for (CacheServer server : getCache().getCacheServers()) {
if (server.isRunning()) {
return false;
}
@@ -1507,41 +1620,40 @@ public class CacheClientNotifier {
}
/**
- * Adds a new <code>CacheClientProxy</code> to the list of known client proxies
+ * Adds a new {@code CacheClientProxy} to the list of known client proxies
*
- * @param proxy The <code>CacheClientProxy</code> to add
+ * @param proxy The {@code CacheClientProxy} to add
*/
- protected void addClientProxy(CacheClientProxy proxy) throws IOException {
- // this._logger.info(LocalizedStrings.DEBUG, "adding client proxy " + proxy);
+ void addClientProxy(CacheClientProxy proxy) {
getCache(); // ensure cache reference is up to date so firstclient state is correct
- this._clientProxies.put(proxy.getProxyID(), proxy);
+ this.clientProxies.put(proxy.getProxyID(), proxy);
// Remove this proxy from the init proxy list.
removeClientInitProxy(proxy);
- this._connectionListener.queueAdded(proxy.getProxyID());
- if (!(proxy.clientConflation == HandShake.CONFLATION_ON)) {
+ this.connectionListener.queueAdded(proxy.getProxyID());
+ if (proxy.clientConflation != HandShake.CONFLATION_ON) {
// Delta not supported with conflation ON
- ClientHealthMonitor chm = ClientHealthMonitor.getInstance();
+ ClientHealthMonitor clientHealthMonitor = ClientHealthMonitor.getInstance();
/*
* #41788 - If the client connection init starts while cache/member is shutting down,
* ClientHealthMonitor.getInstance() might return null.
*/
- if (chm != null) {
- chm.numOfClientsPerVersion.incrementAndGet(proxy.getVersion().ordinal());
+ if (clientHealthMonitor != null) {
+ clientHealthMonitor.numOfClientsPerVersion.incrementAndGet(proxy.getVersion().ordinal());
}
}
this.timedOutDurableClientProxies.remove(proxy.getProxyID());
}
- protected void addClientInitProxy(CacheClientProxy proxy) throws IOException {
- this._initClientProxies.put(proxy.getProxyID(), proxy);
+ private void addClientInitProxy(CacheClientProxy proxy) {
+ this.initClientProxies.put(proxy.getProxyID(), proxy);
}
- protected void removeClientInitProxy(CacheClientProxy proxy) throws IOException {
- this._initClientProxies.remove(proxy.getProxyID());
+ private void removeClientInitProxy(CacheClientProxy proxy) {
+ this.initClientProxies.remove(proxy.getProxyID());
}
- protected boolean isProxyInInitializationMode(CacheClientProxy proxy) throws IOException {
- return this._initClientProxies.containsKey(proxy.getProxyID());
+ private boolean isProxyInInitializationMode(CacheClientProxy proxy) {
+ return this.initClientProxies.containsKey(proxy.getProxyID());
}
/**
@@ -1552,8 +1664,7 @@ public class CacheClientNotifier {
*/
public Set getActiveClients() {
Set clients = new HashSet();
- for (Iterator iter = getClientProxies().iterator(); iter.hasNext();) {
- CacheClientProxy proxy = (CacheClientProxy) iter.next();
+ for (CacheClientProxy proxy : getClientProxies()) {
if (proxy.hasRegisteredInterested()) {
ClientProxyMembershipID proxyID = proxy.getProxyID();
clients.add(proxyID);
@@ -1569,8 +1680,8 @@ public class CacheClientNotifier {
*/
public Map getAllClients() {
Map clients = new HashMap();
- for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) {
- CacheClientProxy proxy = (CacheClientProxy) iter.next();
+ for (final Object o : this.clientProxies.values()) {
+ CacheClientProxy proxy = (CacheClientProxy) o;
ClientProxyMembershipID proxyID = proxy.getProxyID();
clients.put(proxyID, new CacheClientStatus(proxyID));
}
@@ -1586,8 +1697,8 @@ public class CacheClientNotifier {
* @since GemFire 5.6
*/
public boolean hasDurableClient(String durableId) {
- for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) {
- CacheClientProxy proxy = (CacheClientProxy) iter.next();
+ for (Object clientProxyObject : this.clientProxies.values()) {
+ CacheClientProxy proxy = (CacheClientProxy) clientProxyObject;
ClientProxyMembershipID proxyID = proxy.getProxyID();
if (durableId.equals(proxyID.getDurableId())) {
return true;
@@ -1605,15 +1716,11 @@ public class CacheClientNotifier {
* @since GemFire 5.6
*/
public boolean hasPrimaryForDurableClient(String durableId) {
- for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) {
- CacheClientProxy proxy = (CacheClientProxy) iter.next();
+ for (Object clientProxyObject : this.clientProxies.values()) {
+ CacheClientProxy proxy = (CacheClientProxy) clientProxyObject;
ClientProxyMembershipID proxyID = proxy.getProxyID();
if (durableId.equals(proxyID.getDurableId())) {
- if (proxy.isPrimary()) {
- return true;
- } else {
- return false;
- }
+ return proxy.isPrimary();
}
}
return false;
@@ -1626,9 +1733,9 @@ public class CacheClientNotifier {
*/
public Map getClientQueueSizes() {
Map/* <ClientProxyMembershipID,Integer> */ queueSizes = new HashMap();
- for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) {
- CacheClientProxy proxy = (CacheClientProxy) iter.next();
- queueSizes.put(proxy.getProxyID(), Integer.valueOf(proxy.getQueueSize()));
+ for (Object clientProxyObject : this.clientProxies.values()) {
+ CacheClientProxy proxy = (CacheClientProxy) clientProxyObject;
+ queueSizes.put(proxy.getProxyID(), proxy.getQueueSize());
}
return queueSizes;
}
@@ -1645,25 +1752,20 @@ public class CacheClientNotifier {
public boolean closeClientCq(String durableClientId, String clientCQName) throws CqException {
CacheClientProxy proxy = getClientProxy(durableClientId);
// close and drain
- if (proxy != null) {
- return proxy.closeClientCq(clientCQName);
- }
- return false;
+ return proxy != null && proxy.closeClientCq(clientCQName);
}
/**
- * Removes an existing <code>CacheClientProxy</code> from the list of known client proxies
+ * Removes an existing {@code CacheClientProxy} from the list of known client proxies
*
- * @param proxy The <code>CacheClientProxy</code> to remove
+ * @param proxy The {@code CacheClientProxy} to remove
*/
- protected void removeClientProxy(CacheClientProxy proxy) {
- // this._logger.info(LocalizedStrings.DEBUG, "removing client proxy " + proxy, new
- // Exception("stack trace"));
+ void removeClientProxy(CacheClientProxy proxy) {
ClientProxyMembershipID client = proxy.getProxyID();
- this._clientProxies.remove(client);
- this._connectionListener.queueRemoved();
- this.getCache().cleanupForClient(this, client);
- if (!(proxy.clientConflation == HandShake.CONFLATION_ON)) {
+ this.clientProxies.remove(client);
+ this.connectionListener.queueRemoved();
+ getCache().cleanupForClient(this, client);
+ if (proxy.clientConflation != HandShake.CONFLATION_ON) {
ClientHealthMonitor chm = ClientHealthMonitor.getInstance();
if (chm != null) {
chm.numOfClientsPerVersion.decrementAndGet(proxy.getVersion().ordinal());
@@ -1675,18 +1777,18 @@ public class CacheClientNotifier {
this.timedOutDurableClientProxies.add(client);
}
- public boolean isTimedOut(ClientProxyMembershipID client) {
+ private boolean isTimedOut(ClientProxyMembershipID client) {
return this.timedOutDurableClientProxies.contains(client);
}
/**
- * Returns an unmodifiable Collection of known <code>CacheClientProxy</code> instances. The
- * collection is not static so its contents may change.
+ * Returns an unmodifiable Collection of known {@code CacheClientProxy} instances. The collection
+ * is not static so its contents may change.
*
- * @return the collection of known <code>CacheClientProxy</code> instances
+ * @return the collection of known {@code CacheClientProxy} instances
*/
public Collection<CacheClientProxy> getClientProxies() {
- return Collections.unmodifiableCollection(this._clientProxies.values());
+ return Collections.unmodifiableCollection(this.clientProxies.values());
}
private void closeAllClientCqs(CacheClientProxy proxy) {
@@ -1698,12 +1800,12 @@ public class CacheClientNotifier {
logger.debug("CacheClientNotifier: Closing client CQs: {}", proxy);
}
cqService.closeClientCqs(proxy.getProxyID());
- } catch (CqException e1) {
+ } catch (CqException e) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.CacheClientNotifier_UNABLE_TO_CLOSE_CQS_FOR_THE_CLIENT__0,
proxy.getProxyID()));
if (isDebugEnabled) {
- e1.printStackTrace();
+ logger.debug(e);
}
}
}
@@ -1732,16 +1834,17 @@ public class CacheClientNotifier {
}
/**
- * Close dead <code>CacheClientProxy</code> instances
+ * Close dead {@code CacheClientProxy} instances
*
- * @param deadProxies The list of <code>CacheClientProxy</code> instances to close
+ * @param deadProxies The list of {@code CacheClientProxy} instances to close
*/
private void closeDeadProxies(List deadProxies, boolean stoppedNormally) {
final boolean isDebugEnabled = logger.isDebugEnabled();
- for (Iterator i = deadProxies.iterator(); i.hasNext();) {
- CacheClientProxy proxy = (CacheClientProxy) i.next();
- if (isDebugEnabled)
+ for (Object deadProxy : deadProxies) {
+ CacheClientProxy proxy = (CacheClientProxy) deadProxy;
+ if (isDebugEnabled) {
logger.debug("CacheClientNotifier: Closing dead client: {}", proxy);
+ }
// Close the proxy
boolean keepProxy = false;
@@ -1757,8 +1860,7 @@ public class CacheClientNotifier {
if (keepProxy) {
logger.info(LocalizedMessage.create(
LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_KEEPING_PROXY_FOR_DURABLE_CLIENT_NAMED_0_FOR_1_SECONDS_2,
- new Object[] {proxy.getDurableId(), Integer.valueOf(proxy.getDurableTimeout()),
- proxy}));
+ new Object[] {proxy.getDurableId(), proxy.getDurableTimeout(), proxy}));
} else {
closeAllClientCqs(proxy);
if (isDebugEnabled) {
@@ -1771,10 +1873,10 @@ public class CacheClientNotifier {
}
/**
- * Registers a new <code>InterestRegistrationListener</code> with the set of
- * <code>InterestRegistrationListener</code>s.
+ * Registers a new {@code InterestRegistrationListener} with the set of
+ * {@code InterestRegistrationListener}s.
*
- * @param listener The <code>InterestRegistrationListener</code> to register
+ * @param listener The {@code InterestRegistrationListener} to register
*
* @since GemFire 5.8Beta
*/
@@ -1783,10 +1885,10 @@ public class CacheClientNotifier {
}
/**
- * Unregisters an existing <code>InterestRegistrationListener</code> from the set of
- * <code>InterestRegistrationListener</code>s.
+ * Unregisters an existing {@code InterestRegistrationListener} from the set of
+ * {@code InterestRegistrationListener}s.
*
- * @param listener The <code>InterestRegistrationListener</code> to unregister
+ * @param listener The {@code InterestRegistrationListener} to unregister
*
* @since GemFire 5.8Beta
*/
@@ -1795,11 +1897,11 @@ public class CacheClientNotifier {
}
/**
- * Returns a read-only collection of <code>InterestRegistrationListener</code>s registered with
- * this notifier.
+ * Returns a read-only collection of {@code InterestRegistrationListener}s registered with this
+ * notifier.
*
- * @return a read-only collection of <code>InterestRegistrationListener</code>s registered with
- * this notifier
+ * @return a read-only collection of {@code InterestRegistrationListener}s registered with this
+ * notifier
*
* @since GemFire 5.8Beta
*/
@@ -1811,17 +1913,17 @@ public class CacheClientNotifier {
*
* @since GemFire 5.8Beta
*/
- protected boolean containsInterestRegistrationListeners() {
+ boolean containsInterestRegistrationListeners() {
return !this.writableInterestRegistrationListeners.isEmpty();
}
/**
- *
* @since GemFire 5.8Beta
*/
- protected void notifyInterestRegistrationListeners(InterestRegistrationEvent event) {
- for (Iterator i = this.writableInterestRegistrationListeners.iterator(); i.hasNext();) {
- InterestRegistrationListener listener = (InterestRegistrationListener) i.next();
+ void notifyInterestRegistrationListeners(InterestRegistrationEvent event) {
+ for (Object writableInterestRegistrationListener : this.writableInterestRegistrationListeners) {
+ InterestRegistrationListener listener =
+ (InterestRegistrationListener) writableInterestRegistrationListener;
if (event.isRegister()) {
listener.afterRegisterInterest(event);
} else {
@@ -1836,207 +1938,70 @@ public class CacheClientNotifier {
* @return the statistics for the notifier
*/
public CacheClientNotifierStats getStats() {
- return this._statistics;
+ return this.statistics;
}
/**
- * Returns this <code>CacheClientNotifier</code>'s <code>InternalCache</code>.
+ * Returns this {@code CacheClientNotifier}'s {@code InternalCache}.
*
- * @return this <code>CacheClientNotifier</code>'s <code>InternalCache</code>
+ * @return this {@code CacheClientNotifier}'s {@code InternalCache}
*/
protected InternalCache getCache() { // TODO:SYNC: looks wrong
- if (this._cache != null && this._cache.isClosed()) {
+ if (this.cache != null && this.cache.isClosed()) {
InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null) {
- this._cache = cache;
+ this.cache = cache;
this.logWriter = cache.getInternalLogWriter();
this.securityLogWriter = cache.getSecurityInternalLogWriter();
}
}
- return this._cache;
+ return this.cache;
}
/**
- * Returns this <code>CacheClientNotifier</code>'s maximum message count.
+ * Returns this {@code CacheClientNotifier}'s maximum message count.
*
- * @return this <code>CacheClientNotifier</code>'s maximum message count
+ * @return this {@code CacheClientNotifier}'s maximum message count
*/
protected int getMaximumMessageCount() {
return this.maximumMessageCount;
}
/**
- * Returns this <code>CacheClientNotifier</code>'s message time-to-live.
+ * Returns this {@code CacheClientNotifier}'s message time-to-live.
*
- * @return this <code>CacheClientNotifier</code>'s message time-to-live
+ * @return this {@code CacheClientNotifier}'s message time-to-live
*/
protected int getMessageTimeToLive() {
return this.messageTimeToLive;
}
- protected void handleInterestEvent(InterestRegistrationEvent event) {
+ void handleInterestEvent(InterestRegistrationEvent event) {
LocalRegion region = (LocalRegion) event.getRegion();
region.handleInterestEvent(event);
}
- /**
- * @param cache The GemFire <code>InternalCache</code>
- * @param listener a listener which should receive notifications abouts queues being added or
- * removed.
- */
- private CacheClientNotifier(InternalCache cache, CacheServerStats acceptorStats,
- int maximumMessageCount, int messageTimeToLive, ConnectionListener listener,
- List overflowAttributesList, boolean isGatewayReceiver) {
- // Set the Cache
- setCache(cache);
- this.acceptorStats = acceptorStats;
- this.socketCloser = new SocketCloser(1, 50); // we only need one thread per client and wait 50ms
- // for close
-
- // Set the LogWriter
- this.logWriter = (InternalLogWriter) cache.getLogger();
-
- this._connectionListener = listener;
-
- // Set the security LogWriter
- this.securityLogWriter = (InternalLogWriter) cache.getSecurityLogger();
-
- this.maximumMessageCount = maximumMessageCount;
- this.messageTimeToLive = messageTimeToLive;
-
- // Initialize the statistics
- StatisticsFactory factory;
- if (isGatewayReceiver) {
- factory = new DummyStatisticsFactory();
- } else {
- factory = this.getCache().getDistributedSystem();
- }
- this._statistics = new CacheClientNotifierStats(factory);
-
- try {
- this.logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY));
- if (this.logFrequency <= 0) {
- this.logFrequency = DEFAULT_LOG_FREQUENCY;
- }
- } catch (Exception e) {
- this.logFrequency = DEFAULT_LOG_FREQUENCY;
- }
-
- eventEnqueueWaitTime =
- Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME);
- if (eventEnqueueWaitTime < 0) {
- eventEnqueueWaitTime = DEFAULT_EVENT_ENQUEUE_WAIT_TIME;
- }
-
- // Schedule task to periodically ping clients.
- scheduleClientPingTask();
- }
-
- /**
- * this message is used to send interest registration to another server. Since interest
- * registration performs a state-flush operation this message must not transmitted on an ordered
- * socket
- */
- public static class ServerInterestRegistrationMessage extends HighPriorityDistributionMessage
- implements MessageWithReply {
- ClientProxyMembershipID clientId;
- ClientInterestMessageImpl clientMessage;
- int processorId;
-
- ServerInterestRegistrationMessage(ClientProxyMembershipID clientID,
- ClientInterestMessageImpl msg) {
- this.clientId = clientID;
- this.clientMessage = msg;
- }
-
- public ServerInterestRegistrationMessage() {}
-
- static void sendInterestChange(DM dm, ClientProxyMembershipID clientID,
- ClientInterestMessageImpl msg) {
- ServerInterestRegistrationMessage smsg = new ServerInterestRegistrationMessage(clientID, msg);
- Set recipients = dm.getOtherDistributionManagerIds();
- smsg.setRecipients(recipients);
- ReplyProcessor21 rp = new ReplyProcessor21(dm, recipients);
- smsg.processorId = rp.getProcessorId();
- dm.putOutgoing(smsg);
- try {
- rp.waitForReplies();
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- }
-
- @Override
- protected void process(DistributionManager dm) {
- // Get the proxy for the proxy id
- try {
- CacheClientNotifier ccn = CacheClientNotifier.getInstance();
- if (ccn != null) {
- CacheClientProxy proxy = ccn.getClientProxy(clientId);
- // If this VM contains a proxy for the requested proxy id, forward the
- // message on to the proxy for processing
- if (proxy != null) {
- proxy.processInterestMessage(this.clientMessage);
- }
- }
- } finally {
- ReplyMessage reply = new ReplyMessage();
- reply.setProcessorId(this.processorId);
- reply.setRecipient(getSender());
- try {
- dm.putOutgoing(reply);
- } catch (CancelException e) {
- // can't send a reply, so ignore the exception
- }
- }
- }
-
- public int getDSFID() {
- return SERVER_INTEREST_REGISTRATION_MESSAGE;
- }
-
- @Override
- public void toData(DataOutput out) throws IOException {
- super.toData(out);
- out.writeInt(this.processorId);
- InternalDataSerializer.invokeToData(this.clientId, out);
- InternalDataSerializer.invokeToData(this.clientMessage, out);
- }
-
- @Override
- public void fromData(DataInput in) throws IOException, ClassNotFoundException {
- super.fromData(in);
- this.processorId = in.readInt();
- this.clientId = new ClientProxyMembershipID();
- InternalDataSerializer.invokeFromData(this.clientId, in);
- this.clientMessage = new ClientInterestMessageImpl();
- InternalDataSerializer.invokeFromData(this.clientMessage, in);
- }
- }
-
- protected void deliverInterestChange(ClientProxyMembershipID proxyID,
- ClientInterestMessageImpl message) {
- DM dm = ((InternalDistributedSystem) this.getCache().getDistributedSystem())
- .getDistributionManager();
+ void deliverInterestChange(ClientProxyMembershipID proxyID, ClientInterestMessageImpl message) {
+ DM dm = getCache().getInternalDistributedSystem().getDistributionManager();
ServerInterestRegistrationMessage.sendInterestChange(dm, proxyID, message);
}
- public CacheServerStats getAcceptorStats() {
+ CacheServerStats getAcceptorStats() {
return this.acceptorStats;
}
- public SocketCloser getSocketCloser() {
+ SocketCloser getSocketCloser() {
return this.socketCloser;
}
public void addCompiledQuery(DefaultQuery query) {
if (this.compiledQueries.putIfAbsent(query.getQueryString(), query) == null) {
// Added successfully.
- this._statistics.incCompiledQueryCount(1);
+ this.statistics.incCompiledQueryCount(1);
if (logger.isDebugEnabled()) {
logger.debug(
"Added compiled query into ccn.compliedQueries list. Query: {}. Total compiled queries: {}",
- query.getQueryString(), this._statistics.getCompiledQueryCount());
+ query.getQueryString(), this.statistics.getCompiledQueryCount());
}
// Start the clearIdleCompiledQueries thread.
startCompiledQueryCleanupThread();
@@ -2048,13 +2013,13 @@ public class CacheClientNotifier {
}
private void clearCompiledQueries() {
- if (this.compiledQueries.size() > 0) {
- this._statistics.incCompiledQueryCount(-(this.compiledQueries.size()));
+ if (!this.compiledQueries.isEmpty()) {
+ this.statistics.incCompiledQueryCount(-this.compiledQueries.size());
this.compiledQueries.clear();
if (logger.isDebugEnabled()) {
logger.debug(
"Removed all compiled queries from ccn.compliedQueries list. Total compiled queries: {}",
- this._statistics.getCompiledQueryCount());
+ this.statistics.getCompiledQueryCount());
}
}
}
@@ -2064,7 +2029,7 @@ public class CacheClientNotifier {
* checks for the compiled queries that are not used and removes them.
*/
private void startCompiledQueryCleanupThread() {
- if (isCompiledQueryCleanupThreadStarted) {
+ if (this.isCompiledQueryCleanupThreadStarted) {
return;
}
@@ -2082,11 +2047,11 @@ public class CacheClientNotifier {
} else {
if (compiledQueries.remove(e.getKey()) != null) {
// If successfully removed decrement the counter.
- _statistics.incCompiledQueryCount(-1);
+ statistics.incCompiledQueryCount(-1);
if (isDebugEnabled) {
logger.debug("Removed compiled query from ccn.compliedQueries list. Query: "
+ q.getQueryString() + ". Total compiled queries are : "
- + _statistics.getCompiledQueryCount());
+ + statistics.getCompiledQueryCount());
}
}
}
@@ -2094,23 +2059,23 @@ public class CacheClientNotifier {
}
};
- synchronized (lockIsCompiledQueryCleanupThreadStarted) {
- if (!isCompiledQueryCleanupThreadStarted) {
+ synchronized (this.lockIsCompiledQueryCleanupThreadStarted) {
+ if (!this.isCompiledQueryCleanupThreadStarted) {
long period = DefaultQuery.TEST_COMPILED_QUERY_CLEAR_TIME > 0
? DefaultQuery.TEST_COMPILED_QUERY_CLEAR_TIME : DefaultQuery.COMPILED_QUERY_CLEAR_TIME;
- _cache.getCCPTimer().scheduleAtFixedRate(task, period, period);
+ this.cache.getCCPTimer().scheduleAtFixedRate(task, period, period);
}
- isCompiledQueryCleanupThreadStarted = true;
+ this.isCompiledQueryCleanupThreadStarted = true;
}
}
- protected void scheduleClientPingTask() {
+ void scheduleClientPingTask() {
this.clientPingTask = new SystemTimer.SystemTimerTask() {
@Override
public void run2() {
// If there are no proxies, return
- if (CacheClientNotifier.this._clientProxies.isEmpty()) {
+ if (clientProxies.isEmpty()) {
return;
}
@@ -2145,144 +2110,10 @@ public class CacheClientNotifier {
if (logger.isDebugEnabled()) {
logger.debug("Scheduling client ping task with period={} ms", CLIENT_PING_TASK_PERIOD);
}
- CacheClientNotifier.this._cache.getCCPTimer().scheduleAtFixedRate(this.clientPingTask,
+ CacheClientNotifier.this.cache.getCCPTimer().scheduleAtFixedRate(this.clientPingTask,
CLIENT_PING_TASK_PERIOD, CLIENT_PING_TASK_PERIOD);
}
- /**
- * A string representing all hosts used for delivery purposes.
- */
- protected static final String ALL_HOSTS = "ALL_HOSTS";
-
- /**
- * An int representing all ports used for delivery purposes.
- */
- protected static final int ALL_PORTS = -1;
-
- /**
- * The map of known <code>CacheClientProxy</code> instances. Maps ClientProxyMembershipID to
- * CacheClientProxy. Note that the keys in this map are not updated when a durable client
- * reconnects. To make sure you get the updated ClientProxyMembershipID use this map to lookup the
- * CacheClientProxy and then call getProxyID on it.
- */
- private final ConcurrentMap/* <ClientProxyMembershipID, CacheClientProxy> */ _clientProxies =
- new ConcurrentHashMap();
-
- /**
- * The map of <code>CacheClientProxy</code> instances which are getting initialized. Maps
- * ClientProxyMembershipID to CacheClientProxy.
- */
- private final ConcurrentMap/* <ClientProxyMembershipID, CacheClientProxy> */ _initClientProxies =
- new ConcurrentHashMap();
-
- private final HashSet<ClientProxyMembershipID> timedOutDurableClientProxies =
- new HashSet<ClientProxyMembershipID>();
-
- /**
- * The GemFire <code>InternalCache</code>. Note that since this is a singleton class you should
- * not use a direct reference to _cache in CacheClientNotifier code. Instead, you should always
- * use <code>getCache()</code>
- */
- private InternalCache _cache;
-
- private InternalLogWriter logWriter;
-
- /**
- * The GemFire security <code>LogWriter</code>
- */
- private InternalLogWriter securityLogWriter;
-
- /** the maximum number of messages that can be enqueued in a client-queue. */
- private int maximumMessageCount;
-
- /**
- * the time (in seconds) after which a message in the client queue will expire.
- */
- private int messageTimeToLive;
-
- /**
- * A listener which receives notifications about queues that are added or removed
- */
- private ConnectionListener _connectionListener;
-
- private CacheServerStats acceptorStats;
-
- /**
- * haContainer can hold either the name of the client-messages-region (in case of eviction
- * policies "mem" or "entry") or an instance of HashMap (in case of eviction policy "none"). In
- * both the cases, it'll store HAEventWrapper as its key and ClientUpdateMessage as its value.
- */
- private volatile HAContainerWrapper haContainer;
-
- /**
- * The size of the server-to-client communication socket buffers. This can be modified using the
- * BridgeServer.SOCKET_BUFFER_SIZE system property.
- */
- static final private int socketBufferSize =
- Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", 32768).intValue();
-
- /**
- * The statistics for this notifier
- */
- protected final CacheClientNotifierStats _statistics;
-
- /**
- * The <code>InterestRegistrationListener</code> instances registered in this VM. This is used
- * when modifying the set of listeners.
- */
- private final Set writableInterestRegistrationListeners = new CopyOnWriteArraySet();
-
- /**
- * The <code>InterestRegistrationListener</code> instances registered in this VM. This is used to
- * provide a read-only <code>Set</code> of listeners.
- */
- private final Set readableInterestRegistrationListeners =
- Collections.unmodifiableSet(writableInterestRegistrationListeners);
-
- /**
- * System property name for indicating how much frequently the "Queue full" message should be
- * logged.
- */
- public static final String MAX_QUEUE_LOG_FREQUENCY =
- DistributionConfig.GEMFIRE_PREFIX + "logFrequency.clientQueueReachedMaxLimit";
-
- public static final long DEFAULT_LOG_FREQUENCY = 1000;
-
- public static final String EVENT_ENQUEUE_WAIT_TIME_NAME =
- DistributionConfig.GEMFIRE_PREFIX + "subscription.EVENT_ENQUEUE_WAIT_TIME";
-
- public static final int DEFAULT_EVENT_ENQUEUE_WAIT_TIME = 100;
-
- /**
- * System property value denoting the time in milliseconds. Any thread putting an event into a
- * subscription queue, which is full, will wait this much time for the queue to make space. It'll
- * then enque the event possibly causing the queue to grow beyond its capacity/max-size. See
- * #51400.
- */
- public static int eventEnqueueWaitTime;
-
- /**
- * The frequency of logging the "Queue full" message.
- */
- private long logFrequency = DEFAULT_LOG_FREQUENCY;
-
- private final ConcurrentHashMap<String, DefaultQuery> compiledQueries =
- new ConcurrentHashMap<String, DefaultQuery>();
-
- private volatile boolean isCompiledQueryCleanupThreadStarted = false;
-
- private final Object lockIsCompiledQueryCleanupThreadStarted = new Object();
-
- private SystemTimer.SystemTimerTask clientPingTask;
-
- private final SocketCloser socketCloser;
-
- private static final long CLIENT_PING_TASK_PERIOD =
- Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingPeriod", 60000);
-
- private static final long CLIENT_PING_TASK_COUNTER =
- Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingCounter", 3);
-
public long getLogFrequency() {
return this.logFrequency;
}
@@ -2291,64 +2122,153 @@ public class CacheClientNotifier {
* @return the haContainer
*/
public Map getHaContainer() {
- return haContainer;
+ return this.haContainer;
}
- public void initHaContainer(List overflowAttributesList) {
+ private void initHaContainer(List overflowAttributesList) {
// lazily initialize haContainer in case this CCN instance was created by a gateway receiver
if (overflowAttributesList != null
&& !HARegionQueue.HA_EVICTION_POLICY_NONE.equals(overflowAttributesList.get(0))) {
- haContainer = new HAContainerRegion(_cache.getRegion(Region.SEPARATOR
- + CacheServerImpl.clientMessagesRegion(_cache, (String) overflowAttributesList.get(0),
- ((Integer) overflowAttributesList.get(1)).intValue(),
- ((Integer) overflowAttributesList.get(2)).intValue(),
+ this.haContainer = new HAContainerRegion(this.cache.getRegion(Region.SEPARATOR
+ + CacheServerImpl.clientMessagesRegion(this.cache, (String) overflowAttributesList.get(0),
+ (Integer) overflowAttributesList.get(1), (Integer) overflowAttributesList.get(2),
(String) overflowAttributesList.get(3), (Boolean) overflowAttributesList.get(4))));
} else {
- haContainer = new HAContainerMap(new ConcurrentHashMap());
+ this.haContainer = new HAContainerMap(new ConcurrentHashMap());
}
- assert haContainer != null;
+ assert this.haContainer != null;
if (logger.isDebugEnabled()) {
- logger.debug("ha container ({}) has been created.", haContainer.getName());
+ logger.debug("ha container ({}) has been created.", this.haContainer.getName());
}
}
- private final Set blackListedClients = new CopyOnWriteArraySet();
-
- public void addToBlacklistedClient(ClientProxyMembershipID proxyID) {
- blackListedClients.add(proxyID);
+ void addToBlacklistedClient(ClientProxyMembershipID proxyID) {
+ this.blackListedClients.add(proxyID);
// ensure that cache and distributed system state are current and open
- this.getCache();
+ getCache();
new ScheduledThreadPoolExecutor(1).schedule(new ExpireBlackListTask(proxyID), 120,
TimeUnit.SECONDS);
}
- public Set getBlacklistedClient() {
- return blackListedClients;
+ Set getBlacklistedClient() {
+ return this.blackListedClients;
}
/**
- * @param _cache the _cache to set
+ * @param cache the cache to set
*/
- private void setCache(InternalCache _cache) {
- this._cache = _cache;
+ private void setCache(InternalCache cache) {
+ this.cache = cache;
}
+ /**
+ * Non-static inner class ExpireBlackListTask
+ */
private class ExpireBlackListTask extends PoolTask {
- private ClientProxyMembershipID proxyID;
+ private final ClientProxyMembershipID proxyID;
- public ExpireBlackListTask(ClientProxyMembershipID proxyID) {
+ ExpireBlackListTask(ClientProxyMembershipID proxyID) {
this.proxyID = proxyID;
}
@Override
public void run2() {
- if (blackListedClients.remove(proxyID)) {
+ if (blackListedClients.remove(this.proxyID)) {
if (logger.isDebugEnabled()) {
- logger.debug("{} client is no longer blacklisted", proxyID);
+ logger.debug("{} client is no longer blacklisted", this.proxyID);
}
}
}
}
+
+ /**
+ * Static inner-class ServerInterestRegistrationMessage
+ * <p>
+ * this message is used to send interest registration to another server. Since interest
+ * registration performs a state-flush operation this message must not transmitted on an ordered
+ * socket
+ */
+ public static class ServerInterestRegistrationMessage extends HighPriorityDistributionMessage
+ implements MessageWithReply {
+
+ ClientProxyMembershipID clientId;
+ ClientInterestMessageImpl clientMessage;
+ int processorId;
+
+ ServerInterestRegistrationMessage(ClientProxyMembershipID clientID,
+ ClientInterestMessageImpl msg) {
+ this.clientId = clientID;
+ this.clientMessage = msg;
+ }
+
+ public ServerInterestRegistrationMessage() {
+ // nothing
+ }
+
+ static void sendInterestChange(DM dm, ClientProxyMembershipID clientID,
+ ClientInterestMessageImpl msg) {
+ ServerInterestRegistrationMessage registrationMessage =
+ new ServerInterestRegistrationMessage(clientID, msg);
+ Set recipients = dm.getOtherDistributionManagerIds();
+ registrationMessage.setRecipients(recipients);
+ ReplyProcessor21 rp = new ReplyProcessor21(dm, recipients);
+ registrationMessage.processorId = rp.getProcessorId();
+ dm.putOutgoing(registrationMessage);
+ try {
+ rp.waitForReplies();
+ } catch (InterruptedException ignore) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ protected void process(DistributionManager dm) {
+ // Get the proxy for the proxy id
+ try {
+ CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance();
+ if (clientNotifier != null) {
+ CacheClientProxy proxy = clientNotifier.getClientProxy(this.clientId);
+ // If this VM contains a proxy for the requested proxy id, forward the
+ // message on to the proxy f
<TRUNCATED>