You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/31 23:15:27 UTC
[28/35] geode git commit: GEODE-2632: refactoring preparations for
SecurityService and BaseCommand changes
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
index 291db65..8915c55 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
@@ -94,9 +94,9 @@ import org.apache.geode.security.AuthenticationRequiredException;
import org.apache.geode.security.GemFireSecurityException;
/**
- * <code>CacheClientUpdater</code> is a thread that processes update messages from a cache server
- * and {@linkplain org.apache.geode.cache.Region#localInvalidate(Object) invalidates} the local
- * cache based on the contents of those messages.
+ * {@code CacheClientUpdater} is a thread that processes update messages from a cache server and
+ * {@linkplain org.apache.geode.cache.Region#localInvalidate(Object) invalidates} the local cache
+ * based on the contents of those messages.
*
* @since GemFire 3.5
*/
@@ -104,6 +104,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
private static final Logger logger = LogService.getLogger();
+ private static final int DEFAULT_SOCKET_BUFFER_SIZE = 32768;
+
/**
* true if the constructor successfully created a connection. If false, the run method for this
* thread immediately exits.
@@ -129,6 +131,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
* The input stream of the socket
*/
private final InputStream in;
+
/**
* Failed updater from the endpoint previously known as the primary
*/
@@ -139,12 +142,12 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
*/
private final ByteBuffer commBuffer;
- private boolean commBufferReleased;
+ private boolean commBufferReleased; // TODO: fix synchronization
private final CCUStats stats;
/**
- * Cache for which we provide service
+ * Cache for which we provide service TODO: lifecycle and synchronization need work
*/
private /* final */ InternalCache cache;
@@ -175,18 +178,18 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
*/
private boolean isOpCompleted;
- public final static String CLIENT_UPDATER_THREAD_NAME = "Cache Client Updater Thread ";
+ public static final String CLIENT_UPDATER_THREAD_NAME = "Cache Client Updater Thread ";
/**
- * to enable test flag
+ * to enable test flag TODO: eliminate isUsedByTest
*/
public static boolean isUsedByTest;
/**
* Indicates if full value was requested from server as a result of failure in applying delta
- * bytes.
+ * bytes. TODO: only used for test assertion
*/
- public static boolean fullValueRequested = false;
+ static boolean fullValueRequested = false;
private final ServerLocation location;
@@ -195,8 +198,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
private EndpointManager eManager = null;
private Endpoint endpoint = null;
- static private final long MAX_CACHE_WAIT = Long
- .getLong(DistributionConfig.GEMFIRE_PREFIX + "CacheClientUpdater.MAX_WAIT", 120).longValue(); // seconds
+ private static final long MAX_CACHE_WAIT =
+ Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "CacheClientUpdater.MAX_WAIT", 120); // seconds
/**
* Return true if cache appears
@@ -231,7 +234,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
boolean interrupted = Thread.interrupted();
try {
Thread.sleep(1000);
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
interrupted = true;
} finally {
if (interrupted) {
@@ -245,12 +248,12 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
}
/**
- * Creates a new <code>CacheClientUpdater</code> with a given name that waits for a server to
- * connect on a given port.
+ * Creates a new {@code CacheClientUpdater} with a given name that waits for a server to connect
+ * on a given port.
*
* @param name descriptive name, used for our ThreadGroup
* @param location the endpoint we represent
- * @param primary true if our endpoint is primary TODO ask the ep for this?
+ * @param primary true if our endpoint is primary
* @param ids the system we are distributing messages through
*
* @throws AuthenticationRequiredException when client is not configured to send credentials using
@@ -265,6 +268,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
Endpoint endpoint, int handshakeTimeout, SocketCreator socketCreator)
throws AuthenticationRequiredException, AuthenticationFailedException,
ServerRefusedConnectionException {
+
super(LoggingThreadGroup.createThreadGroup("Client update thread"), name);
this.setDaemon(true);
this.system = (InternalDistributedSystem) ids;
@@ -276,6 +280,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
this.eManager = eManager;
this.endpoint = endpoint;
this.stats = new CCUStats(this.system, this.location);
+
// Create the connection...
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
@@ -291,7 +296,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
try {
// Size of the server-to-client communication socket buffers
int socketBufferSize =
- Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", 32768).intValue();
+ Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", DEFAULT_SOCKET_BUFFER_SIZE);
mySock = socketCreator.connectForClient(location.getHostName(), location.getPort(),
handshakeTimeout, socketBufferSize);
@@ -327,31 +332,27 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
}
}
- {
- int bufSize = 1024;
- try {
- bufSize = mySock.getSendBufferSize();
- if (bufSize < 1024) {
- bufSize = 1024;
- }
- } catch (SocketException ignore) {
+ int bufSize = 1024;
+ try {
+ bufSize = mySock.getSendBufferSize();
+ if (bufSize < 1024) {
+ bufSize = 1024;
}
- cb = ServerConnection.allocateCommBuffer(bufSize, mySock);
- }
- {
- // create a "server" memberId we currently don't know much about the
- // server.
- // Would be nice for it to send us its member id
- // TODO: change the serverId to use the endpoint's getMemberId() which
- // returns a
- // DistributedMember (once gfecq branch is merged to trunk).
- MemberAttributes ma =
- new MemberAttributes(0, -1, DistributionManager.NORMAL_DM_TYPE, -1, null, null, null);
- sid = new InternalDistributedMember(mySock.getInetAddress(), mySock.getPort(), false, true,
- ma);
+ } catch (SocketException ignore) {
}
+ cb = ServerConnection.allocateCommBuffer(bufSize, mySock);
+
+ // create a "server" memberId we currently don't know much about the server.
+ // Would be nice for it to send us its member id
+ // TODO: change the serverId to use the endpoint's getMemberId() which returns a
+ // DistributedMember (once gfecq branch is merged to trunk).
+ MemberAttributes ma =
+ new MemberAttributes(0, -1, DistributionManager.NORMAL_DM_TYPE, -1, null, null, null);
+ sid =
+ new InternalDistributedMember(mySock.getInetAddress(), mySock.getPort(), false, true, ma);
+
success = true;
- } catch (ConnectException e) {
+ } catch (ConnectException ignore) {
if (!quitting()) {
logger.warn(LocalizedMessage
.create(LocalizedStrings.CacheClientUpdater_0_CONNECTION_WAS_REFUSED, this));
@@ -385,20 +386,22 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
e.getMessage()));
}
} finally {
- connected = success;
+ this.connected = success;
if (mySock != null) {
try {
mySock.setSoTimeout(0);
- } catch (SocketException e) {
+ } catch (SocketException ignore) {
// ignore: nothing we can do about this
}
}
- if (connected) {
- socket = mySock;
- out = tmpOut;
- in = tmpIn;
- serverId = sid;
- commBuffer = cb;
+
+ if (this.connected) {
+ this.socket = mySock;
+ this.out = tmpOut;
+ this.in = tmpIn;
+ this.serverId = sid;
+ this.commBuffer = cb;
+
// Don't want the timeout after handshake
if (mySock != null) {
try {
@@ -406,12 +409,13 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
} catch (SocketException ignore) {
}
}
+
} else {
- socket = null;
- serverId = null;
- commBuffer = null;
- out = null;
- in = null;
+ this.socket = null;
+ this.serverId = null;
+ this.commBuffer = null;
+ this.out = null;
+ this.in = null;
if (mySock != null) {
try {
@@ -439,29 +443,31 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
}
public boolean isConnected() {
- return connected;
+ return this.connected;
}
+ @Override
public boolean isPrimary() {
- return isPrimary;
+ return this.isPrimary;
}
public InternalLogWriter getSecurityLogger() {
return this.qManager.getSecurityLogger();
}
+ @Override
public void setFailedUpdater(ClientUpdater failedUpdater) {
this.failedUpdater = failedUpdater;
}
/**
- * Performs the work of the client update thread. Creates a <code>ServerSocket</code> and waits
- * for the server to connect to it.
+ * Performs the work of the client update thread. Creates a {@code ServerSocket} and waits for the
+ * server to connect to it.
*/
@Override
public void run() {
+ EntryLogger.setSource(this.serverId, "RI");
boolean addedListener = false;
- EntryLogger.setSource(serverId, "RI");
try {
this.system.addDisconnectListener(this);
addedListener = true;
@@ -472,8 +478,10 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
return;
}
processMessages();
- } catch (CancelException e) {
- return; // just bail
+
+ } catch (CancelException ignore) {
+ // just bail
+
} finally {
if (addedListener) {
this.system.removeDisconnectListener(this);
@@ -486,8 +494,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
/**
* Notifies this thread to stop processing
*/
- protected void stopProcessing() {
- continueProcessing.set(false);// = false;
+ private void stopProcessing() {
+ this.continueProcessing.set(false);
}
/**
@@ -495,39 +503,27 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
* duplicates. Note: this method is not named stop because this is a Thread which has a deprecated
* stop method.
*/
- public void stopUpdater() {
+ private void stopUpdater() {
boolean isSelfDestroying = Thread.currentThread() == this;
-
stopProcessing();
+
// need to also close the socket for this interrupt to wakeup
// the thread. This fixes bug 35691.
- // this.close(); // this should not be done here.
if (this.isAlive()) {
if (logger.isDebugEnabled()) {
logger.debug("{}: Stopping {}", this.location, this);
}
+
if (!isSelfDestroying) {
interrupt();
try {
- if (socket != null) {
- socket.close();
+ if (this.socket != null) {
+ this.socket.close();
}
- } catch (VirtualMachineError err) {
- SystemFailure.initiateFailure(err);
- // If this ever returns, rethrow the error. We're poisoned
- // now, so don't let this thread continue.
- throw err;
- } catch (Throwable t) {
- // Whenever you catch Error or Throwable, you must also
- // catch VirtualMachineError (see above). However, there is
- // _still_ a possibility that you are dealing with a cascading
- // error condition, so you also need to check to see if the JVM
- // is still usable:
- SystemFailure.checkFailure();
- // dont care...
+ } catch (IOException e) {
if (logger.isDebugEnabled()) {
- logger.debug(t.getMessage(), t);
+ logger.debug(e.getMessage(), e);
}
}
} // !isSelfDestroying
@@ -537,32 +533,24 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
/**
* Signals the run thread to stop, closes underlying resources.
*/
+ @Override
public void close() {
- this.continueProcessing.set(false);// = false; // signals we are done.
+ this.continueProcessing.set(false); // signals we are done.
- // Close the socket
- // This will also cause the underlying streams to fail.
+ // Close the socket. This will also cause the underlying streams to fail.
try {
- if (socket != null) {
- socket.close();
+ if (this.socket != null) {
+ this.socket.close();
}
- } catch (Exception e) {
+ } catch (IOException ignore) {
// ignore
}
- try {
- this.stats.close();
- } catch (Exception e) {
- // ignore
- }
+ this.stats.close();
// close the helper
- try {
- if (cacheHelper != null) {
- cacheHelper.close();
- }
- } catch (Exception e) {
- // ignore
+ if (this.cacheHelper != null) {
+ this.cacheHelper.close();
}
releaseCommBuffer();
}
@@ -572,41 +560,32 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
* the server.
*/
private Message initializeMessage() {
- Message _message = new Message(2, Version.CURRENT);
- try {
- _message.setComms(socket, in, out, commBuffer, this.stats);
- } catch (IOException e) {
- if (!quitting()) {
- if (logger.isDebugEnabled()) {
- logger.debug(
- "{}: Caught following exception while attempting to initialize a server-to-client communication socket and will exit",
- this, e);
- }
- stopProcessing();
- }
- }
- return _message;
+ Message message = new Message(2, Version.CURRENT);
+ message.setComms(this.socket, this.in, this.out, this.commBuffer, this.stats);
+ return message;
}
/* refinement of method inherited from Thread */
@Override
public String toString() {
- return this.getName() + " (" + this.location.getHostName() + ":" + this.location.getPort()
- + ")";
+ return getName() + " (" + this.location.getHostName() + ':' + this.location.getPort() + ')';
}
/**
* Handle a marker message
*
- * @param m message containing the data
+ * @param clientMessage message containing the data
*/
- private void handleMarker(Message m) {
+ private void handleMarker(Message clientMessage) {
try {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
- logger.debug("Received marker message of length ({} bytes)", m.getPayloadLength());
+ logger.debug("Received marker message of length ({} bytes)",
+ clientMessage.getPayloadLength());
}
+
this.qManager.getState().processMarker();
+
if (isDebugEnabled) {
logger.debug("Processed marker message");
}
@@ -621,41 +600,40 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
/**
* Create or update an entry
*
- * @param m message containing the data
+ * @param clientMessage message containing the data
*/
- private void handleUpdate(Message m) {
+ private void handleUpdate(Message clientMessage) {
String regionName = null;
Object key = null;
Part valuePart = null;
- Object newValue = null;
- byte[] deltaBytes = null;
- Object fullValue = null;
- boolean isValueObject = false;
- int partCnt = 0;
final boolean isDebugEnabled = logger.isDebugEnabled();
+
try {
this.isOpCompleted = false;
+
// Retrieve the data from the put message parts
if (isDebugEnabled) {
- logger.debug("Received put message of length ({} bytes)", m.getPayloadLength());
+ logger.debug("Received put message of length ({} bytes)", clientMessage.getPayloadLength());
}
- Part regionNamePart = m.getPart(partCnt++);
- Part keyPart = m.getPart(partCnt++);
- boolean isDeltaSent = ((Boolean) m.getPart(partCnt++).getObject()).booleanValue();
- valuePart = m.getPart(partCnt++);
- Part callbackArgumentPart = m.getPart(partCnt++);
- VersionTag versionTag = (VersionTag) m.getPart(partCnt++).getObject();
+ int partCnt = 0;
+ Part regionNamePart = clientMessage.getPart(partCnt++);
+ Part keyPart = clientMessage.getPart(partCnt++);
+ boolean isDeltaSent = (Boolean) clientMessage.getPart(partCnt++).getObject();
+ valuePart = clientMessage.getPart(partCnt++);
+ Part callbackArgumentPart = clientMessage.getPart(partCnt++);
+ VersionTag versionTag = (VersionTag) clientMessage.getPart(partCnt++).getObject();
if (versionTag != null) {
versionTag.replaceNullIDs((InternalDistributedMember) this.endpoint.getMemberId());
}
- Part isInterestListPassedPart = m.getPart(partCnt++);
- Part hasCqsPart = m.getPart(partCnt++);
+ Part isInterestListPassedPart = clientMessage.getPart(partCnt++);
+ Part hasCqsPart = clientMessage.getPart(partCnt++);
- EventID eventId = (EventID) m.getPart(m.getNumberOfParts() - 1).getObject();
+ EventID eventId =
+ (EventID) clientMessage.getPart(clientMessage.getNumberOfParts() - 1).getObject();
- boolean withInterest = ((Boolean) isInterestListPassedPart.getObject()).booleanValue();
- boolean withCQs = ((Boolean) hasCqsPart.getObject()).booleanValue();
+ boolean withInterest = (Boolean) isInterestListPassedPart.getObject();
+ boolean withCQs = (Boolean) hasCqsPart.getObject();
regionName = regionNamePart.getString();
key = keyPart.getStringOrObject();
@@ -666,30 +644,39 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
// object, it will be stored as a CachedDeserializable and
// deserialized only when requested.
- boolean isCreate = (m.getMessageType() == MessageType.LOCAL_CREATE);
+ boolean isCreate = clientMessage.getMessageType() == MessageType.LOCAL_CREATE;
+
if (isDebugEnabled) {
- logger
- .debug(
- "Putting entry for region: {} key: {} create: {}{} callbackArgument: {} withInterest={} withCQs={} eventID={} version={}",
- regionName, key, isCreate,
- (valuePart.isObject() ? new StringBuilder(" value: ")
- .append(deserialize(valuePart.getSerializedForm())) : ""),
- callbackArgument, withInterest, withCQs, eventId, versionTag);
+ logger.debug(
+ "Putting entry for region: {} key: {} create: {}{} callbackArgument: {} withInterest={} withCQs={} eventID={} version={}",
+ regionName, key, isCreate,
+ valuePart.isObject()
+ ? new StringBuilder(" value: ").append(deserialize(valuePart.getSerializedForm()))
+ : "",
+ callbackArgument, withInterest, withCQs, eventId, versionTag);
}
- LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
+ LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
+
+ Object newValue = null;
+ byte[] deltaBytes = null;
+ Object fullValue = null;
+ boolean isValueObject;
if (!isDeltaSent) {
// bug #42162 - must check for a serialized null here
byte[] serializedForm = valuePart.getSerializedForm();
+
if (isCreate && InternalDataSerializer.isSerializedNull(serializedForm)) {
// newValue = null; newValue is already null
} else {
newValue = valuePart.getSerializedForm();
}
+
if (withCQs) {
fullValue = valuePart.getObject();
}
+
isValueObject = valuePart.isObject();
} else {
deltaBytes = valuePart.getSerializedForm();
@@ -700,40 +687,49 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
if (isDebugEnabled && !quitting()) {
logger.debug("{}: Region named {} does not exist", this, regionName);
}
+
} else if (region.hasServerProxy() && ServerResponseMatrix
- .checkForValidStateAfterNotification(region, key, m.getMessageType())
+ .checkForValidStateAfterNotification(region, key, clientMessage.getMessageType())
&& (withInterest || !withCQs)) {
@Released
EntryEventImpl newEvent = null;
+
try {
// Create an event and put the entry
newEvent = EntryEventImpl.create(region,
- ((m.getMessageType() == MessageType.LOCAL_CREATE) ? Operation.CREATE
- : Operation.UPDATE),
+ clientMessage.getMessageType() == MessageType.LOCAL_CREATE ? Operation.CREATE
+ : Operation.UPDATE,
key, null /* newValue */, callbackArgument /* callbackArg */, true /* originRemote */,
eventId.getDistributedMember());
+
newEvent.setVersionTag(versionTag);
newEvent.setFromServer(true);
+
region.basicBridgeClientUpdate(eventId.getDistributedMember(), key, newValue, deltaBytes,
- isValueObject, callbackArgument, m.getMessageType() == MessageType.LOCAL_CREATE,
- qManager.getState().getProcessedMarker() || !this.isDurableClient, newEvent, eventId);
+ isValueObject, callbackArgument,
+ clientMessage.getMessageType() == MessageType.LOCAL_CREATE,
+ this.qManager.getState().getProcessedMarker() || !this.isDurableClient, newEvent,
+ eventId);
+
this.isOpCompleted = true;
+
// bug 45520 - ConcurrentCacheModificationException is not thrown and we must check this
// flag
- // if (newEvent.isConcurrencyConflict()) {
- // return; // this is logged elsewhere at fine level
- // }
if (withCQs && isDeltaSent) {
fullValue = newEvent.getNewValue();
}
- } catch (InvalidDeltaException ide) {
+ } catch (InvalidDeltaException ignore) {
Part fullValuePart = requestFullValue(eventId, "Caught InvalidDeltaException.");
region.getCachePerfStats().incDeltaFullValuesRequested();
- fullValue = newValue = fullValuePart.getObject();
- isValueObject = Boolean.valueOf(fullValuePart.isObject());
+ fullValue = newValue = fullValuePart.getObject(); // TODO: fix this line
+ isValueObject = fullValuePart.isObject();
+
region.basicBridgeClientUpdate(eventId.getDistributedMember(), key, newValue, null,
- isValueObject, callbackArgument, m.getMessageType() == MessageType.LOCAL_CREATE,
- qManager.getState().getProcessedMarker() || !this.isDurableClient, newEvent, eventId);
+ isValueObject, callbackArgument,
+ clientMessage.getMessageType() == MessageType.LOCAL_CREATE,
+ this.qManager.getState().getProcessedMarker() || !this.isDurableClient, newEvent,
+ eventId);
+
this.isOpCompleted = true;
} finally {
if (newEvent != null)
@@ -748,20 +744,19 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
// Update CQs. CQs can exist without client region.
if (withCQs) {
- Part numCqsPart = m.getPart(partCnt++);
+ Part numCqsPart = clientMessage.getPart(partCnt++);
if (isDebugEnabled) {
logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}",
numCqsPart.getInt() / 2);
}
- partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), key, fullValue,
- deltaBytes, eventId);
+ partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(),
+ clientMessage.getMessageType(), key, fullValue, deltaBytes, eventId);
this.isOpCompleted = true;
}
} catch (Exception e) {
String message =
LocalizedStrings.CacheClientUpdater_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_ATTEMPTING_TO_PUT_ENTRY_REGION_0_KEY_1_VALUE_2
- .toLocalizedString(
- new Object[] {regionName, key, deserialize(valuePart.getSerializedForm())});
+ .toLocalizedString(regionName, key, deserialize(valuePart.getSerializedForm()));
handleException(message, e);
}
}
@@ -774,12 +769,14 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
if (isDebugEnabled) {
logger.debug("{} Requesting full value...", reason);
}
- Part result = (Part) GetEventValueOp.executeOnPrimary(qManager.getPool(), eventId, null);
+ Part result = (Part) GetEventValueOp.executeOnPrimary(this.qManager.getPool(), eventId, null);
if (result == null) {
// Just log a warning. Do not stop CCU thread.
+ // TODO: throw a subclass of Exception
throw new Exception("Could not retrieve full value for " + eventId);
}
+
if (isDebugEnabled) {
logger.debug("Full value received.");
}
@@ -789,39 +786,41 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
/**
* Invalidate an entry
*
- * @param m message describing the entry
+ * @param clientMessage message describing the entry
*/
- private void handleInvalidate(Message m) {
+ private void handleInvalidate(Message clientMessage) {
String regionName = null;
Object key = null;
- int partCnt = 0;
-
final boolean isDebugEnabled = logger.isDebugEnabled();
+
try {
this.isOpCompleted = false;
+
// Retrieve the data from the local-invalidate message parts
if (isDebugEnabled) {
- logger.debug("Received invalidate message of length ({} bytes)", m.getPayloadLength());
+ logger.debug("Received invalidate message of length ({} bytes)",
+ clientMessage.getPayloadLength());
}
- Part regionNamePart = m.getPart(partCnt++);
- Part keyPart = m.getPart(partCnt++);
- Part callbackArgumentPart = m.getPart(partCnt++);
+ int partCnt = 0;
+ Part regionNamePart = clientMessage.getPart(partCnt++);
+ Part keyPart = clientMessage.getPart(partCnt++);
+ Part callbackArgumentPart = clientMessage.getPart(partCnt++);
- VersionTag versionTag = (VersionTag) m.getPart(partCnt++).getObject();
+ VersionTag versionTag = (VersionTag) clientMessage.getPart(partCnt++).getObject();
if (versionTag != null) {
versionTag.replaceNullIDs((InternalDistributedMember) this.endpoint.getMemberId());
}
- Part isInterestListPassedPart = m.getPart(partCnt++);
- Part hasCqsPart = m.getPart(partCnt++);
+ Part isInterestListPassedPart = clientMessage.getPart(partCnt++);
+ Part hasCqsPart = clientMessage.getPart(partCnt++);
regionName = regionNamePart.getString();
key = keyPart.getStringOrObject();
Object callbackArgument = callbackArgumentPart.getObject();
- boolean withInterest = ((Boolean) isInterestListPassedPart.getObject()).booleanValue();
- boolean withCQs = ((Boolean) hasCqsPart.getObject()).booleanValue();
+ boolean withInterest = (Boolean) isInterestListPassedPart.getObject();
+ boolean withCQs = (Boolean) hasCqsPart.getObject();
if (isDebugEnabled) {
logger.debug(
@@ -829,34 +828,36 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
regionName, key, callbackArgument, withInterest, withCQs, versionTag);
}
- LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
+ LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
if (region == null) {
if (isDebugEnabled && !quitting()) {
logger.debug("Region named {} does not exist", regionName);
}
+
} else {
if (region.hasServerProxy() && (withInterest || !withCQs)) {
try {
- Part eid = m.getPart(m.getNumberOfParts() - 1);
+ Part eid = clientMessage.getPart(clientMessage.getNumberOfParts() - 1);
EventID eventId = (EventID) eid.getObject();
+
try {
region.basicBridgeClientInvalidate(eventId.getDistributedMember(), key,
callbackArgument,
- qManager.getState().getProcessedMarker() || !this.isDurableClient, eventId,
+ this.qManager.getState().getProcessedMarker() || !this.isDurableClient, eventId,
versionTag);
- } catch (ConcurrentCacheModificationException e) {
- // return; allow CQs to be processed
+ } catch (ConcurrentCacheModificationException ignore) {
+ // allow CQs to be processed
}
+
this.isOpCompleted = true;
// fix for 36615
- qManager.getState().incrementInvalidatedStats();
+ this.qManager.getState().incrementInvalidatedStats();
if (isDebugEnabled) {
logger.debug("Invalidated entry for region: {} key: {} callbackArgument: {}",
regionName, key, callbackArgument);
}
- } catch (EntryNotFoundException e) {
- /* ignore */
+ } catch (EntryNotFoundException ignore) {
if (isDebugEnabled && !quitting()) {
logger.debug("Already invalidated entry for region: {} key: {} callbackArgument: {}",
regionName, key, callbackArgument);
@@ -869,19 +870,20 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
if (withCQs) {
// The client may have been registered to receive invalidates for
// create and updates operations. Get the actual region operation.
- Part regionOpType = m.getPart(partCnt++);
- Part numCqsPart = m.getPart(partCnt++);
+ Part regionOpType = clientMessage.getPart(partCnt++);
+ Part numCqsPart = clientMessage.getPart(partCnt++);
if (isDebugEnabled) {
logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}",
numCqsPart.getInt() / 2);
}
- partCnt = processCqs(m, partCnt, numCqsPart.getInt(), regionOpType.getInt(), key, null);
+ partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(), regionOpType.getInt(),
+ key, null);
this.isOpCompleted = true;
}
} catch (Exception e) {
final String message =
LocalizedStrings.CacheClientUpdater_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_ATTEMPTING_TO_INVALIDATE_ENTRY_REGION_0_KEY_1
- .toLocalizedString(new Object[] {regionName, key});
+ .toLocalizedString(regionName, key);
handleException(message, e);
}
}
@@ -889,26 +891,27 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
/**
* locally destroy an entry
*
- * @param m message describing the entry
+ * @param clientMessage message describing the entry
*/
- private void handleDestroy(Message m) {
+ private void handleDestroy(Message clientMessage) {
String regionName = null;
Object key = null;
- int partCnt = 0;
-
final boolean isDebugEnabled = logger.isDebugEnabled();
+
try {
this.isOpCompleted = false;
// Retrieve the data from the local-destroy message parts
if (isDebugEnabled) {
- logger.debug("Received destroy message of length ({} bytes)", m.getPayloadLength());
+ logger.debug("Received destroy message of length ({} bytes)",
+ clientMessage.getPayloadLength());
}
- Part regionNamePart = m.getPart(partCnt++);
- Part keyPart = m.getPart(partCnt++);
- Part callbackArgumentPart = m.getPart(partCnt++);
+ int partCnt = 0;
+ Part regionNamePart = clientMessage.getPart(partCnt++);
+ Part keyPart = clientMessage.getPart(partCnt++);
+ Part callbackArgumentPart = clientMessage.getPart(partCnt++);
- VersionTag versionTag = (VersionTag) m.getPart(partCnt++).getObject();
+ VersionTag versionTag = (VersionTag) clientMessage.getPart(partCnt++).getObject();
if (versionTag != null) {
versionTag.replaceNullIDs((InternalDistributedMember) this.endpoint.getMemberId());
}
@@ -916,8 +919,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
regionName = regionNamePart.getString();
key = keyPart.getStringOrObject();
- Part isInterestListPassedPart = m.getPart(partCnt++);
- Part hasCqsPart = m.getPart(partCnt++);
+ Part isInterestListPassedPart = clientMessage.getPart(partCnt++);
+ Part hasCqsPart = clientMessage.getPart(partCnt++);
boolean withInterest = ((Boolean) isInterestListPassedPart.getObject()).booleanValue();
boolean withCQs = ((Boolean) hasCqsPart.getObject()).booleanValue();
@@ -929,30 +932,32 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
regionName, key, callbackArgument, withInterest, withCQs, versionTag);
}
- LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
- EventID eventId = null;
+ LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
if (region == null) {
if (isDebugEnabled && !quitting()) {
logger.debug("Region named {} does not exist", regionName);
}
+
} else if (region.hasServerProxy() && (withInterest || !withCQs)) {
+ EventID eventId = null;
try {
- Part eid = m.getPart(m.getNumberOfParts() - 1);
+ Part eid = clientMessage.getPart(clientMessage.getNumberOfParts() - 1);
eventId = (EventID) eid.getObject();
+
try {
region.basicBridgeClientDestroy(eventId.getDistributedMember(), key, callbackArgument,
- qManager.getState().getProcessedMarker() || !this.isDurableClient, eventId,
+ this.qManager.getState().getProcessedMarker() || !this.isDurableClient, eventId,
versionTag);
- } catch (ConcurrentCacheModificationException e) {
- // return; allow CQs to be processed
+ } catch (ConcurrentCacheModificationException ignore) {
+ // allow CQs to be processed
}
+
this.isOpCompleted = true;
if (isDebugEnabled) {
logger.debug("Destroyed entry for region: {} key: {} callbackArgument: {}", regionName,
key, callbackArgument);
}
- } catch (EntryNotFoundException e) {
- /* ignore */
+ } catch (EntryNotFoundException ignore) {
if (isDebugEnabled && !quitting()) {
logger.debug(
"Already destroyed entry for region: {} key: {} callbackArgument: {} eventId={}",
@@ -963,18 +968,19 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
}
if (withCQs) {
- Part numCqsPart = m.getPart(partCnt++);
+ Part numCqsPart = clientMessage.getPart(partCnt++);
if (isDebugEnabled) {
logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}",
numCqsPart.getInt() / 2);
}
- partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), key, null);
+ partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(),
+ clientMessage.getMessageType(), key, null);
this.isOpCompleted = true;
}
} catch (Exception e) {
String message =
LocalizedStrings.CacheClientUpdater_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_ATTEMPTING_TO_DESTROY_ENTRY_REGION_0_KEY_1
- .toLocalizedString(new Object[] {regionName, key});
+ .toLocalizedString(regionName, key);
handleException(message, e);
}
}
@@ -982,44 +988,44 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
/**
* Locally destroy a region
*
- * @param m message describing the region
+ * @param clientMessage message describing the region
*/
- private void handleDestroyRegion(Message m) {
- Part regionNamePart = null, callbackArgumentPart = null;
+ private void handleDestroyRegion(Message clientMessage) {
String regionName = null;
- Object callbackArgument = null;
- LocalRegion region = null;
- int partCnt = 0;
-
final boolean isDebugEnabled = logger.isDebugEnabled();
+
try {
// Retrieve the data from the local-destroy-region message parts
if (isDebugEnabled) {
- logger.debug("Received destroy region message of length ({} bytes)", m.getPayloadLength());
+ logger.debug("Received destroy region message of length ({} bytes)",
+ clientMessage.getPayloadLength());
}
- regionNamePart = m.getPart(partCnt++);
- callbackArgumentPart = m.getPart(partCnt++);
+ int partCnt = 0;
+ Part regionNamePart = clientMessage.getPart(partCnt++);
+ Part callbackArgumentPart = clientMessage.getPart(partCnt++);
regionName = regionNamePart.getString();
- callbackArgument = callbackArgumentPart.getObject();
+ Object callbackArgument = callbackArgumentPart.getObject();
- Part hasCqsPart = m.getPart(partCnt++);
+ Part hasCqsPart = clientMessage.getPart(partCnt++);
if (isDebugEnabled) {
logger.debug("Destroying region: {} callbackArgument: {}", regionName, callbackArgument);
}
// Handle CQs if any on this region.
- if (((Boolean) hasCqsPart.getObject()).booleanValue()) {
- Part numCqsPart = m.getPart(partCnt++);
+ if ((Boolean) hasCqsPart.getObject()) {
+ Part numCqsPart = clientMessage.getPart(partCnt++);
if (isDebugEnabled) {
logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}",
numCqsPart.getInt() / 2);
}
- partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), null, null);
+ // TODO: partCnt is unused -- does processCqs have side effects
+ partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(),
+ clientMessage.getMessageType(), null, null);
}
// Confirm that the region exists
- region = (LocalRegion) cacheHelper.getRegion(regionName);
+ LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
if (region == null) {
if (isDebugEnabled && !quitting()) {
logger.debug("Region named {} does not exist", regionName);
@@ -1036,7 +1042,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
logger.debug("Destroyed region: {} callbackArgument: {}", regionName, callbackArgument);
}
}
- } catch (RegionDestroyedException e) { // already destroyed
+ } catch (RegionDestroyedException ignore) { // already destroyed
if (isDebugEnabled) {
logger.debug("region already destroyed: {}", regionName);
}
@@ -1051,24 +1057,24 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
/**
* Locally clear a region
*
- * @param m message describing the region to clear
+ * @param clientMessage message describing the region to clear
*/
- private void handleClearRegion(Message m) {
+ private void handleClearRegion(Message clientMessage) {
String regionName = null;
- int partCnt = 0;
-
final boolean isDebugEnabled = logger.isDebugEnabled();
+
try {
// Retrieve the data from the clear-region message parts
if (isDebugEnabled) {
logger.debug("{}: Received clear region message of length ({} bytes)", this,
- m.getPayloadLength());
+ clientMessage.getPayloadLength());
}
- Part regionNamePart = m.getPart(partCnt++);
- Part callbackArgumentPart = m.getPart(partCnt++);
+ int partCnt = 0;
+ Part regionNamePart = clientMessage.getPart(partCnt++);
+ Part callbackArgumentPart = clientMessage.getPart(partCnt++);
- Part hasCqsPart = m.getPart(partCnt++);
+ Part hasCqsPart = clientMessage.getPart(partCnt++);
regionName = regionNamePart.getString();
Object callbackArgument = callbackArgumentPart.getObject();
@@ -1076,17 +1082,18 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
logger.debug("Clearing region: {} callbackArgument: {}", regionName, callbackArgument);
}
- if (((Boolean) hasCqsPart.getObject()).booleanValue()) {
- Part numCqsPart = m.getPart(partCnt++);
+ if ((Boolean) hasCqsPart.getObject()) {
+ Part numCqsPart = clientMessage.getPart(partCnt++);
if (isDebugEnabled) {
logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}",
numCqsPart.getInt() / 2);
}
- partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), null, null);
+ partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(),
+ clientMessage.getMessageType(), null, null);
}
// Confirm that the region exists
- LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
+ LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
if (region == null) {
if (isDebugEnabled && !quitting()) {
logger.debug("Region named {} does not exist", regionName);
@@ -1099,7 +1106,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
if (region.hasServerProxy()) {
// Locally clear the region
region.basicBridgeClientClear(callbackArgument,
- qManager.getState().getProcessedMarker() || !this.isDurableClient);
+ this.qManager.getState().getProcessedMarker() || !this.isDurableClient);
if (isDebugEnabled) {
logger.debug("Cleared region: {} callbackArgument: {}", regionName, callbackArgument);
@@ -1117,50 +1124,44 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
* Locally invalidate a region NOTE: Added as part of bug#38048. The code only takes care of CQ
* processing. Support needs to be added for local region invalidate.
*
- * @param m message describing the region to clear
+ * @param clientMessage message describing the region to clear
*/
- private void handleInvalidateRegion(Message m) {
+ private void handleInvalidateRegion(Message clientMessage) {
String regionName = null;
- int partCnt = 0;
-
final boolean isDebugEnabled = logger.isDebugEnabled();
+
try {
// Retrieve the data from the invalidate-region message parts
if (isDebugEnabled) {
logger.debug("{}: Received invalidate region message of length ({} bytes)", this,
- m.getPayloadLength());
+ clientMessage.getPayloadLength());
}
- Part regionNamePart = m.getPart(partCnt++);
+ int partCnt = 0;
+ Part regionNamePart = clientMessage.getPart(partCnt++);
partCnt++; // Part callbackArgumentPart = m.getPart(partCnt++);
- Part hasCqsPart = m.getPart(partCnt++);
+ Part hasCqsPart = clientMessage.getPart(partCnt++);
regionName = regionNamePart.getString();
- // Object callbackArgument = callbackArgumentPart.getObject();
- if (((Boolean) hasCqsPart.getObject()).booleanValue()) {
- Part numCqsPart = m.getPart(partCnt++);
+ if ((Boolean) hasCqsPart.getObject()) {
+ Part numCqsPart = clientMessage.getPart(partCnt++);
if (isDebugEnabled) {
logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}",
numCqsPart.getInt() / 2);
}
- partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), null, null);
+ // TODO: partCnt is unused
+ partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(),
+ clientMessage.getMessageType(), null, null);
}
// Confirm that the region exists
- LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
+ LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
if (region == null) {
if (isDebugEnabled && !quitting()) {
logger.debug("Region named {} does not exist", regionName);
}
- return;
- }
-
- // Verify that the region in question should respond to this
- // message
- if (region.hasServerProxy()) {
- return;
}
} catch (Exception e) {
@@ -1174,40 +1175,39 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
/**
* Register instantiators locally
*
- * @param msg message describing the new instantiators
+ * @param clientMessage message describing the new instantiators
* @param eventId eventId of the instantiators
*/
- private void handleRegisterInstantiator(Message msg, EventID eventId) {
+ private void handleRegisterInstantiator(Message clientMessage, EventID eventId) {
String instantiatorClassName = null;
final boolean isDebugEnabled = logger.isDebugEnabled();
+
try {
- int noOfParts = msg.getNumberOfParts();
+ int noOfParts = clientMessage.getNumberOfParts();
if (isDebugEnabled) {
logger.debug("{}: Received register instantiators message of parts {}", getName(),
noOfParts);
}
+
Assert.assertTrue((noOfParts - 1) % 3 == 0);
- for (int i = 0; i < noOfParts - 1; i = i + 3) {
+ for (int i = 0; i < noOfParts - 1; i += 3) {
instantiatorClassName =
- (String) CacheServerHelper.deserialize(msg.getPart(i).getSerializedForm());
- String instantiatedClassName =
- (String) CacheServerHelper.deserialize(msg.getPart(i + 1).getSerializedForm());
- int id = msg.getPart(i + 2).getInt();
+ (String) CacheServerHelper.deserialize(clientMessage.getPart(i).getSerializedForm());
+ String instantiatedClassName = (String) CacheServerHelper
+ .deserialize(clientMessage.getPart(i + 1).getSerializedForm());
+ int id = clientMessage.getPart(i + 2).getInt();
InternalInstantiator.register(instantiatorClassName, instantiatedClassName, id, false,
- eventId, null/* context */);
- // distribute is false because we don't want to propagate this to
- // servers recursively
+ eventId, null);
+ // distribute is false because we don't want to propagate this to servers recursively
}
// CALLBACK TESTING PURPOSE ONLY
if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
- ClientServerObserver bo = ClientServerObserverHolder.getInstance();
- bo.afterReceivingFromServer(eventId);
+ ClientServerObserver clientServerObserver = ClientServerObserverHolder.getInstance();
+ clientServerObserver.afterReceivingFromServer(eventId);
}
- }
- // TODO bug: can the following catch be more specific?
- catch (Exception e) {
+ } catch (Exception e) {
if (isDebugEnabled) {
logger.debug("{}: Caught following exception while attempting to read Instantiator : {}",
this, instantiatorClassName, e);
@@ -1218,6 +1218,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
private void handleRegisterDataSerializer(Message msg, EventID eventId) {
Class dataSerializerClass = null;
final boolean isDebugEnabled = logger.isDebugEnabled();
+
try {
int noOfParts = msg.getNumberOfParts();
if (isDebugEnabled) {
@@ -1231,8 +1232,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
(String) CacheServerHelper.deserialize(msg.getPart(i).getSerializedForm());
int id = msg.getPart(i + 1).getInt();
InternalDataSerializer.register(dataSerializerClassName, false, eventId, null, id);
- // distribute is false because we don't want to propagate this to
- // servers recursively
+ // distribute is false because we don't want to propagate this to servers recursively
int numOfClasses = msg.getPart(i + 2).getInt();
int j = 0;
@@ -1241,7 +1241,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
(String) CacheServerHelper.deserialize(msg.getPart(i + 3 + j).getSerializedForm());
InternalDataSerializer.updateSupportedClassesMap(dataSerializerClassName, className);
}
- i = i + 3 + j;
+
+ i += 3 + j;
} catch (ClassNotFoundException e) {
if (isDebugEnabled) {
logger.debug(
@@ -1257,9 +1258,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
bo.afterReceivingFromServer(eventId);
}
- }
- // TODO bug: can the following catch be more specific?
- catch (Exception e) {
+ } catch (Exception e) {
if (isDebugEnabled) {
logger.debug("{}: Caught following exception while attempting to read DataSerializer : {}",
this, dataSerializerClass, e);
@@ -1270,93 +1269,87 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
/**
* Processes message to invoke CQ listeners.
*/
- private int processCqs(Message m, int startMessagePart, int numCqParts, int messageType,
- Object key, Object value) {
- return processCqs(m, startMessagePart, numCqParts, messageType, key, value, null,
- null/* eventId */);
+ private int processCqs(Message clientMessage, int startMessagePart, int numCqParts,
+ int messageType, Object key, Object value) {
+ return processCqs(clientMessage, startMessagePart, numCqParts, messageType, key, value, null,
+ null);
}
- private int processCqs(Message m, int startMessagePart, int numCqParts, int messageType,
- Object key, Object value, byte[] delta, EventID eventId) {
+ private int processCqs(Message clientMessage, int startMessagePart, int numCqParts,
+ int messageType, Object key, Object value, byte[] delta, EventID eventId) {
HashMap cqs = new HashMap();
final boolean isDebugEnabled = logger.isDebugEnabled();
for (int cqCnt = 0; cqCnt < numCqParts;) {
- StringBuilder str = null;
+ StringBuilder sb = null;
if (isDebugEnabled) {
- str = new StringBuilder(100);
- str.append("found these queries: ");
+ sb = new StringBuilder(100);
+ sb.append("found these queries: ");
}
try {
// Get CQ Name.
- Part cqNamePart = m.getPart(startMessagePart + (cqCnt++));
+ Part cqNamePart = clientMessage.getPart(startMessagePart + cqCnt++);
// Get CQ Op.
- Part cqOpPart = m.getPart(startMessagePart + (cqCnt++));
- cqs.put(cqNamePart.getString(), Integer.valueOf(cqOpPart.getInt()));
+ Part cqOpPart = clientMessage.getPart(startMessagePart + cqCnt++);
+ cqs.put(cqNamePart.getString(), cqOpPart.getInt());
- if (str != null) {
- str.append(cqNamePart.getString()).append(" op=").append(cqOpPart.getInt()).append(" ");
+ if (sb != null) {
+ sb.append(cqNamePart.getString()).append(" op=").append(cqOpPart.getInt()).append(" ");
}
- } catch (Exception ex) {
+ } catch (Exception ignore) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.CacheClientUpdater_ERROR_WHILE_PROCESSING_THE_CQ_MESSAGE_PROBLEM_WITH_READING_MESSAGE_FOR_CQ_0,
cqCnt));
}
- if (isDebugEnabled && str != null) {
- logger.debug(str);
+ if (isDebugEnabled) {
+ logger.debug(sb);
}
}
- {
- CqService cqService = this.cache.getCqService();
- try {
- cqService.dispatchCqListeners(cqs, messageType, key, value, delta, qManager, eventId);
- } catch (Exception ex) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.CacheClientUpdater_FAILED_TO_INVOKE_CQ_DISPATCHER_ERROR___0,
- ex.getMessage()));
- if (isDebugEnabled) {
- logger.debug("Failed to invoke CQ Dispatcher.", ex);
- }
+ CqService cqService = this.cache.getCqService();
+ try {
+ cqService.dispatchCqListeners(cqs, messageType, key, value, delta, this.qManager, eventId);
+ } catch (Exception ex) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.CacheClientUpdater_FAILED_TO_INVOKE_CQ_DISPATCHER_ERROR___0,
+ ex.getMessage()));
+ if (isDebugEnabled) {
+ logger.debug("Failed to invoke CQ Dispatcher.", ex);
}
}
- return (startMessagePart + numCqParts);
+ return startMessagePart + numCqParts;
}
- private void handleRegisterInterest(Message m) {
+ private void handleRegisterInterest(Message clientMessage) {
String regionName = null;
Object key = null;
- int interestType;
- byte interestResultPolicy;
- boolean isDurable;
- boolean receiveUpdatesAsInvalidates;
- int partCnt = 0;
-
final boolean isDebugEnabled = logger.isDebugEnabled();
+
try {
// Retrieve the data from the add interest message parts
if (isDebugEnabled) {
logger.debug("{}: Received add interest message of length ({} bytes)", this,
- m.getPayloadLength());
+ clientMessage.getPayloadLength());
}
- Part regionNamePart = m.getPart(partCnt++);
- Part keyPart = m.getPart(partCnt++);
- Part interestTypePart = m.getPart(partCnt++);
- Part interestResultPolicyPart = m.getPart(partCnt++);
- Part isDurablePart = m.getPart(partCnt++);
- Part receiveUpdatesAsInvalidatesPart = m.getPart(partCnt++);
+
+ int partCnt = 0;
+ Part regionNamePart = clientMessage.getPart(partCnt++);
+ Part keyPart = clientMessage.getPart(partCnt++);
+ Part interestTypePart = clientMessage.getPart(partCnt++);
+ Part interestResultPolicyPart = clientMessage.getPart(partCnt++);
+ Part isDurablePart = clientMessage.getPart(partCnt++);
+ Part receiveUpdatesAsInvalidatesPart = clientMessage.getPart(partCnt++);
regionName = regionNamePart.getString();
key = keyPart.getStringOrObject();
- interestType = ((Integer) interestTypePart.getObject()).intValue();
- interestResultPolicy = ((Byte) interestResultPolicyPart.getObject()).byteValue();
- isDurable = ((Boolean) isDurablePart.getObject()).booleanValue();
- receiveUpdatesAsInvalidates =
- ((Boolean) receiveUpdatesAsInvalidatesPart.getObject()).booleanValue();
+ int interestType = (Integer) interestTypePart.getObject();
+ byte interestResultPolicy = (Byte) interestResultPolicyPart.getObject();
+ boolean isDurable = (Boolean) isDurablePart.getObject();
+ boolean receiveUpdatesAsInvalidates = (Boolean) receiveUpdatesAsInvalidatesPart.getObject();
// Confirm that region exists
- LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
+ LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
if (region == null) {
if (isDebugEnabled && !quitting()) {
logger.debug("{}: Region named {} does not exist", this, regionName);
@@ -1386,38 +1379,34 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
}
}
- private void handleUnregisterInterest(Message m) {
+ private void handleUnregisterInterest(Message clientMessage) {
String regionName = null;
Object key = null;
- int interestType;
- boolean isDurable;
- boolean receiveUpdatesAsInvalidates;
- int partCnt = 0;
-
final boolean isDebugEnabled = logger.isDebugEnabled();
+
try {
// Retrieve the data from the remove interest message parts
if (isDebugEnabled) {
logger.debug("{}: Received remove interest message of length ({} bytes)", this,
- m.getPayloadLength());
+ clientMessage.getPayloadLength());
}
- Part regionNamePart = m.getPart(partCnt++);
- Part keyPart = m.getPart(partCnt++);
- Part interestTypePart = m.getPart(partCnt++);
- Part isDurablePart = m.getPart(partCnt++);
- Part receiveUpdatesAsInvalidatesPart = m.getPart(partCnt++);
+ int partCnt = 0;
+ Part regionNamePart = clientMessage.getPart(partCnt++);
+ Part keyPart = clientMessage.getPart(partCnt++);
+ Part interestTypePart = clientMessage.getPart(partCnt++);
+ Part isDurablePart = clientMessage.getPart(partCnt++);
+ Part receiveUpdatesAsInvalidatesPart = clientMessage.getPart(partCnt++);
// Not reading the eventId part
regionName = regionNamePart.getString();
key = keyPart.getStringOrObject();
- interestType = ((Integer) interestTypePart.getObject()).intValue();
- isDurable = ((Boolean) isDurablePart.getObject()).booleanValue();
- receiveUpdatesAsInvalidates =
- ((Boolean) receiveUpdatesAsInvalidatesPart.getObject()).booleanValue();
+ int interestType = (Integer) interestTypePart.getObject();
+ boolean isDurable = (Boolean) isDurablePart.getObject();
+ boolean receiveUpdatesAsInvalidates = (Boolean) receiveUpdatesAsInvalidatesPart.getObject();
// Confirm that region exists
- LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
+ LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
if (region == null) {
if (isDebugEnabled) {
logger.debug("{}: Region named {} does not exist", this, regionName);
@@ -1445,14 +1434,17 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
}
}
- private void handleTombstoneOperation(Message msg) {
+ private void handleTombstoneOperation(Message clientMessage) {
String regionName = "unknown";
+
try { // not sure why this isn't done by the caller
int partIdx = 0;
+
// see ClientTombstoneMessage.getGFE70Message
- regionName = msg.getPart(partIdx++).getString();
- int op = msg.getPart(partIdx++).getInt();
- LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
+ regionName = clientMessage.getPart(partIdx++).getString();
+ int op = clientMessage.getPart(partIdx++).getInt();
+ LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
+
if (region == null) {
if (!quitting()) {
if (logger.isDebugEnabled()) {
@@ -1461,24 +1453,29 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
}
return;
}
+
if (logger.isDebugEnabled()) {
logger.debug("{}: Received tombstone operation for region {} with operation={}", this,
region, op);
}
+
if (!region.getConcurrencyChecksEnabled()) {
return;
}
+
switch (op) {
case 0:
Map<VersionSource, Long> regionGCVersions =
- (Map<VersionSource, Long>) msg.getPart(partIdx++).getObject();
- EventID eventID = (EventID) msg.getPart(partIdx++).getObject();
+ (Map<VersionSource, Long>) clientMessage.getPart(partIdx++).getObject();
+ EventID eventID = (EventID) clientMessage.getPart(partIdx++).getObject();
region.expireTombstones(regionGCVersions, eventID, null);
break;
+
case 1:
- Set<Object> removedKeys = (Set<Object>) msg.getPart(partIdx++).getObject();
+ Set<Object> removedKeys = (Set<Object>) clientMessage.getPart(partIdx++).getObject();
region.expireTombstoneKeys(removedKeys);
break;
+
default:
throw new IllegalArgumentException("unknown operation type " + op);
}
@@ -1494,22 +1491,21 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
*/
private boolean quitting() {
if (isInterrupted()) {
- // Any time an interrupt is thrown at this thread, regard it as a
- // request to terminate
+ // Any time an interrupt is thrown at this thread, regard it as a request to terminate
return true;
}
- if (!continueProcessing.get()) {
+ if (!this.continueProcessing.get()) {
// de facto flag indicating we are to stop
return true;
}
- if (cache != null && cache.getCancelCriterion().isCancelInProgress()) {
+ if (this.cache != null && this.cache.getCancelCriterion().isCancelInProgress()) {
// System is cancelling
return true;
}
// The pool stuff is really sick, so it's possible for us to have a distributed
// system that is not the same as our cache. Check it just in case...
- if (system.getCancelCriterion().isCancelInProgress()) {
+ if (this.system.getCancelCriterion().isCancelInProgress()) {
return true;
}
@@ -1531,15 +1527,15 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
this.failedUpdater.join(5000);
}
}
- } catch (InterruptedException ie) {
+ } catch (InterruptedException ignore) {
gotInterrupted = true;
- return; // just bail, because I have not done anything yet
+ // just bail, because I have not done anything yet
} finally {
if (!gotInterrupted && this.failedUpdater != null) {
logger.info(LocalizedMessage.create(
LocalizedStrings.CacheClientUpdater_0_HAS_COMPLETED_WAITING_FOR_1,
new Object[] {this, this.failedUpdater}));
- failedUpdater = null;
+ this.failedUpdater = null;
}
}
}
@@ -1548,6 +1544,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
* Processes messages received from the server.
*
* Only certain types of messages are handled.
+ *
+ * TODO: Method 'processMessages' is too complex to analyze by data flow algorithm
*
* @see MessageType#CLIENT_MARKER
* @see MessageType#LOCAL_CREATE
@@ -1558,11 +1556,11 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
* @see MessageType#CLEAR_REGION
* @see ClientUpdateMessage
*/
- protected void processMessages() {
+ private void processMessages() {
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
- Part eid = null;
- Message _message = initializeMessage();
+ Message clientMessage = initializeMessage();
+
if (quitting()) {
if (isDebugEnabled) {
logger.debug("processMessages quitting early because we have stopped");
@@ -1570,11 +1568,11 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
// our caller calls close which will notify all waiters for our init
return;
}
+
logger.info(LocalizedMessage
.create(LocalizedStrings.CacheClientUpdater_0_READY_TO_PROCESS_MESSAGES, this));
- while (continueProcessing.get()) {
- // SystemFailure.checkFailure(); dm will check this
+ while (this.continueProcessing.get()) {
if (quitting()) {
if (isDebugEnabled) {
logger.debug("termination detected");
@@ -1594,12 +1592,12 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
try {
// Read the message
- _message.recv();
+ clientMessage.recv();
// Wait for the previously failed cache client updater
// to finish. This will avoid out of order messages.
waitForFailedUpdater();
- cache.waitForRegisterInterestsInProgress();
+ this.cache.waitForRegisterInterestsInProgress();
if (quitting()) {
if (isDebugEnabled) {
logger.debug("processMessages quitting before processing message");
@@ -1608,7 +1606,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
}
// If the message is a ping, ignore it
- if (_message.getMessageType() == MessageType.SERVER_TO_CLIENT_PING) {
+ if (clientMessage.getMessageType() == MessageType.SERVER_TO_CLIENT_PING) {
if (isDebugEnabled) {
logger.debug("{}: Received ping", this);
}
@@ -1616,76 +1614,80 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
}
boolean isDeltaSent = false;
- boolean isCreateOrUpdate = _message.getMessageType() == MessageType.LOCAL_CREATE
- || _message.getMessageType() == MessageType.LOCAL_UPDATE;
+ boolean isCreateOrUpdate = clientMessage.getMessageType() == MessageType.LOCAL_CREATE
+ || clientMessage.getMessageType() == MessageType.LOCAL_UPDATE;
if (isCreateOrUpdate) {
- isDeltaSent = ((Boolean) _message.getPart(2).getObject()).booleanValue();
+ isDeltaSent = (Boolean) clientMessage.getPart(2).getObject();
}
// extract the eventId and verify if it is a duplicate event
// if it is a duplicate event, ignore
// @since GemFire 5.1
- int numberOfParts = _message.getNumberOfParts();
- eid = _message.getPart(numberOfParts - 1);
+ int numberOfParts = clientMessage.getNumberOfParts();
+ Part eid = clientMessage.getPart(numberOfParts - 1);
+
// TODO the message handling methods also deserialized the eventID - inefficient
EventID eventId = (EventID) eid.getObject();
// no need to verify if the instantiator msg is duplicate or not
- if (_message.getMessageType() != MessageType.REGISTER_INSTANTIATORS
- && _message.getMessageType() != MessageType.REGISTER_DATASERIALIZERS) {
+ if (clientMessage.getMessageType() != MessageType.REGISTER_INSTANTIATORS
+ && clientMessage.getMessageType() != MessageType.REGISTER_DATASERIALIZERS) {
if (this.qManager.getState().verifyIfDuplicate(eventId,
!(this.isDurableClient || isDeltaSent))) {
continue;
}
}
+
if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) {
- logger.trace(LogMarker.BRIDGE_SERVER,
- "Processing event with id {}" + eventId.expensiveToString());
+ logger.trace(LogMarker.BRIDGE_SERVER, "Processing event with id {}",
+ eventId.expensiveToString());
}
+
this.isOpCompleted = true;
+
// Process the message
- switch (_message.getMessageType()) {
+ switch (clientMessage.getMessageType()) {
case MessageType.LOCAL_CREATE:
case MessageType.LOCAL_UPDATE:
- handleUpdate(_message);
+ handleUpdate(clientMessage);
break;
case MessageType.LOCAL_INVALIDATE:
- handleInvalidate(_message);
+ handleInvalidate(clientMessage);
break;
case MessageType.LOCAL_DESTROY:
- handleDestroy(_message);
+ handleDestroy(clientMessage);
break;
case MessageType.LOCAL_DESTROY_REGION:
- handleDestroyRegion(_message);
+ handleDestroyRegion(clientMessage);
break;
case MessageType.CLEAR_REGION:
- handleClearRegion(_message);
+ handleClearRegion(clientMessage);
break;
case MessageType.REGISTER_INSTANTIATORS:
- handleRegisterInstantiator(_message, eventId);
+ handleRegisterInstantiator(clientMessage, eventId);
break;
case MessageType.REGISTER_DATASERIALIZERS:
- handleRegisterDataSerializer(_message, eventId);
+ handleRegisterDataSerializer(clientMessage, eventId);
break;
case MessageType.CLIENT_MARKER:
- handleMarker(_message);
+ handleMarker(clientMessage);
break;
case MessageType.INVALIDATE_REGION:
- handleInvalidateRegion(_message);
+ handleInvalidateRegion(clientMessage);
break;
case MessageType.CLIENT_REGISTER_INTEREST:
- handleRegisterInterest(_message);
+ handleRegisterInterest(clientMessage);
break;
case MessageType.CLIENT_UNREGISTER_INTEREST:
- handleUnregisterInterest(_message);
+ handleUnregisterInterest(clientMessage);
break;
case MessageType.TOMBSTONE_OPERATION:
- handleTombstoneOperation(_message);
+ handleTombstoneOperation(clientMessage);
break;
default:
logger.warn(LocalizedMessage.create(
LocalizedStrings.CacheClientUpdater_0_RECEIVED_AN_UNSUPPORTED_MESSAGE_TYPE_1,
- new Object[] {this, MessageType.getString(_message.getMessageType())}));
+ new Object[] {this, MessageType.getString(clientMessage.getMessageType())}));
break;
}
@@ -1700,7 +1702,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
// likely to send pings...
// and the ClientHealthMonitor will cause a disconnect
- } catch (InterruptedIOException e) {
+ } catch (InterruptedIOException ignore) {
// Per Sun's support web site, this exception seems to be peculiar
// to Solaris, and may eventually not even be generated there.
//
@@ -1708,62 +1710,59 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
// isInterrupted() is false. (How very odd!)
//
// We regard it the same as an InterruptedException
- this.endPointDied = true;
- continueProcessing.set(false);// = false;
+ this.continueProcessing.set(false);
if (isDebugEnabled) {
logger.debug("InterruptedIOException");
}
+
} catch (IOException e) {
- this.endPointDied = true;
// Either the server went away, or we caught a closing condition.
if (!quitting()) {
// Server departed; print a message.
- String message = ": Caught the following exception and will exit: ";
- String errMessage = e.getMessage();
- if (errMessage == null) {
- errMessage = "";
- }
- ClientServerObserver bo = ClientServerObserverHolder.getInstance();
- bo.beforeFailoverByCacheClientUpdater(this.location);
- eManager.serverCrashed(this.endpoint);
+ ClientServerObserver clientServerObserver = ClientServerObserverHolder.getInstance();
+ clientServerObserver.beforeFailoverByCacheClientUpdater(this.location);
+ this.eManager.serverCrashed(this.endpoint);
if (isDebugEnabled) {
- logger.debug("" + message + e);
+ logger.debug("Caught the following exception and will exit", e);
}
} // !quitting
// In any event, terminate this thread.
- continueProcessing.set(false);// = false;
+ this.continueProcessing.set(false);
if (isDebugEnabled) {
logger.debug("terminated due to IOException");
}
+
} catch (Exception e) {
if (!quitting()) {
- this.endPointDied = true;
- ClientServerObserver bo = ClientServerObserverHolder.getInstance();
- bo.beforeFailoverByCacheClientUpdater(this.location);
- eManager.serverCrashed(this.endpoint);
+ ClientServerObserver clientServerObserver = ClientServerObserverHolder.getInstance();
+ clientServerObserver.beforeFailoverByCacheClientUpdater(this.location);
+ this.eManager.serverCrashed(this.endpoint);
String message = ": Caught the following exception and will exit: ";
handleException(message, e);
}
+
// In any event, terminate this thread.
- continueProcessing.set(false);// = false; // force termination
+ this.continueProcessing.set(false);// = false; // force termination
if (isDebugEnabled) {
logger.debug("CCU terminated due to Exception");
}
+
} finally {
- _message.clear();
+ clientMessage.clear();
}
} // while
+
} finally {
if (isDebugEnabled) {
logger.debug("has stopped and cleaning the helper ..");
}
- this.close(); // added to fixes some race conditions associated with 38382
+ close(); // added to fix some race conditions associated with 38382
// this will make sure that if this thread dies without starting QueueMgr then it will start..
// 1. above we ignore InterruptedIOException and this thread dies without informing QueueMgr
- // 2. if there is some other race codition with continueProcessing flag
- this.qManager.checkEndpoint(this, endpoint);
+ // 2. if there is some other race condition with continueProcessing flag
+ this.qManager.checkEndpoint(this, this.endpoint);
}
}
@@ -1796,12 +1795,11 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
*/
private Object deserialize(byte[] serializedBytes) {
Object deserializedObject = serializedBytes;
- // This is a debugging method so ignore all exceptions like
- // ClassNotFoundException
+ // This is a debugging method so ignore all exceptions like ClassNotFoundException
try {
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(serializedBytes));
deserializedObject = DataSerializer.readObject(dis);
- } catch (Exception e) {
+ } catch (ClassNotFoundException | IOException ignore) {
}
return deserializedObject;
}
@@ -1810,18 +1808,14 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
* @return the local port of our {@link #socket}
*/
protected int getLocalPort() {
- return socket.getLocalPort();
+ return this.socket.getLocalPort();
}
+ @Override
public void onDisconnect(InternalDistributedSystem sys) {
stopUpdater();
}
- /**
- * true if the EndPoint represented by this updater thread has died.
- */
- private volatile boolean endPointDied = false;
-
private void verifySocketBufferSize(int requestedBufferSize, int actualBufferSize, String type) {
if (actualBufferSize < requestedBufferSize) {
logger.info(LocalizedMessage.create(
@@ -1837,11 +1831,11 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
* @since GemFire 5.7
*/
public static class CCUStats implements MessageStats {
- // static fields
+
private static final StatisticsType type;
- private final static int messagesBeingReceivedId;
- private final static int messageBytesBeingReceivedId;
- private final static int receivedBytesId;
+ private static final int messagesBeingReceivedId;
+ private static final int messageBytesBeingReceivedId;
+ private static final int receivedBytesId;
static {
StatisticsTypeFactory f = StatisticsTypeFactoryImpl.singleton();
@@ -1863,7 +1857,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
// instance fields
private final Statistics stats;
- public CCUStats(DistributedSystem ids, ServerLocation location) {
+ CCUStats(DistributedSystem ids, ServerLocation location) {
// no need for atomic since only a single thread will be writing these
this.stats = ids.createStatistics(type, "CacheClientUpdater-" + location);
}
@@ -1872,25 +1866,29 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
this.stats.close();
}
+ @Override
public void incReceivedBytes(long v) {
this.stats.incLong(receivedBytesId, v);
}
+ @Override
public void incSentBytes(long v) {
// noop since we never send messages
}
+ @Override
public void incMessagesBeingReceived(int bytes) {
- stats.incInt(messagesBeingReceivedId, 1);
+ this.stats.incInt(messagesBeingReceivedId, 1);
if (bytes > 0) {
- stats.incLong(messageBytesBeingReceivedId, bytes);
+ this.stats.incLong(messageBytesBeingReceivedId, bytes);
}
}
+ @Override
public void decMessagesBeingReceived(int bytes) {
- stats.incInt(messagesBeingReceivedId, -1);
+ this.stats.incInt(messagesBeingReceivedId, -1);
if (bytes > 0) {
- stats.incLong(messageBytesBeingReceivedId, -bytes);
+ this.stats.incLong(messageBytesBeingReceivedId, -bytes);
}
}
@@ -1904,7 +1902,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
}
}
+ @Override
public boolean isProcessing() {
- return continueProcessing.get();
+ return this.continueProcessing.get();
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java
index 2a5a3d7..39c2f3a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java
@@ -22,7 +22,6 @@ import org.apache.geode.internal.logging.LogService;
import java.io.EOFException;
import java.io.IOException;
-import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import org.apache.logging.log4j.Logger;
@@ -36,7 +35,7 @@ import org.apache.logging.log4j.Logger;
*
* <PRE>
*
- * msgType - int - 4 bytes type of message, types enumerated below
+ * messageType - int - 4 bytes type of message, types enumerated below
*
* numberOfParts - int - 4 bytes number of elements (LEN-BYTE* pairs) contained
* in the payload. Message can be a multi-part message
@@ -153,7 +152,8 @@ public class ChunkedMessage extends Message {
public void setLastChunkAndNumParts(boolean lastChunk, int numParts) {
setLastChunk(lastChunk);
- if (this.sc != null && this.sc.getClientVersion().compareTo(Version.GFE_65) >= 0) {
+ if (this.serverConnection != null
+ && this.serverConnection.getClientVersion().compareTo(Version.GFE_65) >= 0) {
// we us e three bits for number of parts in last chunk byte
// we us e three bits for number of parts in last chunk byte
byte localLastChunk = (byte) (numParts << 5);
@@ -162,7 +162,7 @@ public class ChunkedMessage extends Message {
}
public void setServerConnection(ServerConnection servConn) {
- if (this.sc != servConn)
+ if (this.serverConnection != servConn)
throw new IllegalStateException("this.sc was not correctly set");
}
@@ -209,7 +209,7 @@ public class ChunkedMessage extends Message {
// Set the header and payload fields only after receiving all the
// socket data, providing better message consistency in the face
// of exceptional conditions (e.g. IO problems, timeouts etc.)
- this.msgType = type;
+ this.messageType = type;
this.numberOfParts = numParts; // Already set in setPayloadFields via setNumberOfParts
this.transactionId = txid;
}
@@ -241,14 +241,15 @@ public class ChunkedMessage extends Message {
int totalBytesRead = 0;
do {
int bytesRead = 0;
- bytesRead = is.read(cb.array(), totalBytesRead, CHUNK_HEADER_LENGTH - totalBytesRead);
+ bytesRead =
+ inputStream.read(cb.array(), totalBytesRead, CHUNK_HEADER_LENGTH - totalBytesRead);
if (bytesRead == -1) {
throw new EOFException(
LocalizedStrings.ChunkedMessage_CHUNK_READ_ERROR_CONNECTION_RESET.toLocalizedString());
}
totalBytesRead += bytesRead;
- if (this.msgStats != null) {
- this.msgStats.incReceivedBytes(bytesRead);
+ if (this.messageStats != null) {
+ this.messageStats.incReceivedBytes(bytesRead);
}
} while (totalBytesRead < CHUNK_HEADER_LENGTH);
@@ -315,7 +316,7 @@ public class ChunkedMessage extends Message {
* Sends a chunk of this message.
*/
public void sendChunk(ServerConnection servConn) throws IOException {
- if (this.sc != servConn)
+ if (this.serverConnection != servConn)
throw new IllegalStateException("this.sc was not correctly set");
sendChunk();
}
@@ -355,7 +356,7 @@ public class ChunkedMessage extends Message {
protected void getHeaderBytesForWrite() {
final ByteBuffer cb = getCommBuffer();
cb.clear();
- cb.putInt(this.msgType);
+ cb.putInt(this.messageType);
cb.putInt(this.numberOfParts);
cb.putInt(this.transactionId);