You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/23 02:23:21 UTC
[06/22] geode git commit: Cleanup BaseCommand
http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put65.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put65.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put65.java
index d53c89e..581aec6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put65.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put65.java
@@ -63,7 +63,7 @@ public class Put65 extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long p_start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long p_start)
throws IOException, InterruptedException {
long start = p_start;
Part regionNamePart = null, keyPart = null, valuePart = null, callbackArgPart = null;
@@ -72,11 +72,11 @@ public class Put65 extends BaseCommand {
Part eventPart = null;
StringBuffer errMessage = new StringBuffer();
boolean isDelta = false;
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- CacheServerStats stats = servConn.getCacheServerStats();
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
// requiresResponse = true;
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
{
long oldStart = start;
start = DistributionStats.getStatTime();
@@ -84,50 +84,50 @@ public class Put65 extends BaseCommand {
}
// Retrieve the data from the message parts
int idx = 0;
- regionNamePart = msg.getPart(idx++);
+ regionNamePart = clientMessage.getPart(idx++);
Operation operation;
try {
- operation = (Operation) msg.getPart(idx++).getObject();
+ operation = (Operation) clientMessage.getPart(idx++).getObject();
if (operation == null) { // native clients send a null since the op is java-serialized
operation = Operation.UPDATE;
}
} catch (ClassNotFoundException e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- int flags = msg.getPart(idx++).getInt();
+ int flags = clientMessage.getPart(idx++).getInt();
boolean requireOldValue = ((flags & 0x01) == 0x01);
boolean haveExpectedOldValue = ((flags & 0x02) == 0x02);
Object expectedOldValue = null;
if (haveExpectedOldValue) {
try {
- expectedOldValue = msg.getPart(idx++).getObject();
+ expectedOldValue = clientMessage.getPart(idx++).getObject();
} catch (ClassNotFoundException e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
}
- keyPart = msg.getPart(idx++);
+ keyPart = clientMessage.getPart(idx++);
try {
- isDelta = ((Boolean) msg.getPart(idx).getObject()).booleanValue();
+ isDelta = ((Boolean) clientMessage.getPart(idx).getObject()).booleanValue();
idx += 1;
} catch (Exception e) {
- writeException(msg, MessageType.PUT_DELTA_ERROR, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, MessageType.PUT_DELTA_ERROR, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
// CachePerfStats not available here.
return;
}
- valuePart = msg.getPart(idx++);
- eventPart = msg.getPart(idx++);
- if (msg.getNumberOfParts() > idx) {
- callbackArgPart = msg.getPart(idx++);
+ valuePart = clientMessage.getPart(idx++);
+ eventPart = clientMessage.getPart(idx++);
+ if (clientMessage.getNumberOfParts() > idx) {
+ callbackArgPart = clientMessage.getPart(idx++);
try {
callbackArg = callbackArgPart.getObject();
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
}
@@ -136,8 +136,8 @@ public class Put65 extends BaseCommand {
try {
key = keyPart.getStringOrObject();
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -145,8 +145,8 @@ public class Put65 extends BaseCommand {
if (isDebugEnabled) {
logger.debug(
"{}: Received {}put request ({} bytes) from {} for region {} key {} txId {} posdup: {}",
- servConn.getName(), (isDelta ? " delta " : " "), msg.getPayloadLength(),
- servConn.getSocketString(), regionName, key, msg.getTransactionId(), msg.isRetry());
+ serverConnection.getName(), (isDelta ? " delta " : " "), clientMessage.getPayloadLength(),
+ serverConnection.getSocketString(), regionName, key, clientMessage.getTransactionId(), clientMessage.isRetry());
}
// Process the put request
@@ -154,27 +154,27 @@ public class Put65 extends BaseCommand {
if (key == null) {
String putMsg = " The input key for the put request is null";
if (isDebugEnabled) {
- logger.debug("{}:{}", servConn.getName(), putMsg);
+ logger.debug("{}:{}", serverConnection.getName(), putMsg);
}
errMessage.append(putMsg);
}
if (regionName == null) {
String putMsg = " The input region name for the put request is null";
if (isDebugEnabled) {
- logger.debug("{}:{}", servConn.getName(), putMsg);
+ logger.debug("{}:{}", serverConnection.getName(), putMsg);
}
errMessage.append(putMsg);
}
- writeErrorResponse(msg, MessageType.PUT_DATA_ERROR, errMessage.toString(), servConn);
- servConn.setAsTrue(RESPONDED);
+ writeErrorResponse(clientMessage, MessageType.PUT_DATA_ERROR, errMessage.toString(), serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName);
+ LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName);
if (region == null) {
String reason = " was not found during put request";
- writeRegionDestroyedEx(msg, regionName, reason, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -182,11 +182,11 @@ public class Put65 extends BaseCommand {
// Invalid to 'put' a null value in an existing key
String putMsg = " Attempted to put a null value for existing key " + key;
if (isDebugEnabled) {
- logger.debug("{}:{}", servConn.getName(), putMsg);
+ logger.debug("{}:{}", serverConnection.getName(), putMsg);
}
errMessage.append(putMsg);
- writeErrorResponse(msg, MessageType.PUT_DATA_ERROR, errMessage.toString(), servConn);
- servConn.setAsTrue(RESPONDED);
+ writeErrorResponse(clientMessage, MessageType.PUT_DATA_ERROR, errMessage.toString(), serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -195,12 +195,12 @@ public class Put65 extends BaseCommand {
long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
EventIDHolder clientEvent =
- new EventIDHolder(new EventID(servConn.getEventMemberIDByteArray(), threadId, sequenceId));
+ new EventIDHolder(new EventID(serverConnection.getEventMemberIDByteArray(), threadId, sequenceId));
Breadcrumbs.setEventId(clientEvent.getEventId());
// msg.isRetry might be set by v7.0 and later clients
- if (msg.isRetry()) {
+ if (clientMessage.isRetry()) {
// if (logger.isDebugEnabled()) {
// logger.debug("DEBUG: encountered isRetry in Put65");
// }
@@ -226,13 +226,13 @@ public class Put65 extends BaseCommand {
}
boolean isObject = valuePart.isObject();
boolean isMetaRegion = region.isUsedForMetaRegion();
- msg.setMetaRegion(isMetaRegion);
+ clientMessage.setMetaRegion(isMetaRegion);
this.securityService.authorizeRegionWrite(regionName, key.toString());
AuthorizeRequest authzRequest = null;
if (!isMetaRegion) {
- authzRequest = servConn.getAuthzRequest();
+ authzRequest = serverConnection.getAuthzRequest();
}
if (authzRequest != null) {
if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) {
@@ -257,7 +257,7 @@ public class Put65 extends BaseCommand {
// to be publicly accessible.
if (operation == Operation.PUT_IF_ABSENT) {
// try {
- if (msg.isRetry() && clientEvent.getVersionTag() != null) {
+ if (clientMessage.isRetry() && clientEvent.getVersionTag() != null) {
// bug #46590 the operation was successful the last time since it
// was applied to the cache, so return success and the recovered
// version tag
@@ -267,16 +267,16 @@ public class Put65 extends BaseCommand {
}
// invoke basicBridgePutIfAbsent anyway to ensure that the event is distributed to all
// servers - bug #51664
- region.basicBridgePutIfAbsent(key, value, isObject, callbackArg, servConn.getProxyID(),
+ region.basicBridgePutIfAbsent(key, value, isObject, callbackArg, serverConnection.getProxyID(),
true, clientEvent);
oldValue = null;
} else {
oldValue = region.basicBridgePutIfAbsent(key, value, isObject, callbackArg,
- servConn.getProxyID(), true, clientEvent);
+ serverConnection.getProxyID(), true, clientEvent);
}
sendOldValue = true;
oldValueIsObject = true;
- Version clientVersion = servConn.getClientVersion();
+ Version clientVersion = serverConnection.getClientVersion();
if (oldValue instanceof CachedDeserializable) {
oldValue = ((CachedDeserializable) oldValue).getSerializedValue();
} else if (oldValue instanceof byte[]) {
@@ -299,7 +299,7 @@ public class Put65 extends BaseCommand {
} else if (operation == Operation.REPLACE) {
// try {
if (requireOldValue) { // <V> replace(<K>, <V>)
- if (msg.isRetry() && clientEvent.isConcurrencyConflict()
+ if (clientMessage.isRetry() && clientEvent.isConcurrencyConflict()
&& clientEvent.getVersionTag() != null) {
if (isDebugEnabled) {
logger.debug("replace(k,v) operation was successful last time with version {}",
@@ -307,10 +307,10 @@ public class Put65 extends BaseCommand {
}
}
oldValue = region.basicBridgeReplace(key, value, isObject, callbackArg,
- servConn.getProxyID(), true, clientEvent);
+ serverConnection.getProxyID(), true, clientEvent);
sendOldValue = !clientEvent.isConcurrencyConflict();
oldValueIsObject = true;
- Version clientVersion = servConn.getClientVersion();
+ Version clientVersion = serverConnection.getClientVersion();
if (oldValue instanceof CachedDeserializable) {
oldValue = ((CachedDeserializable) oldValue).getSerializedValue();
} else if (oldValue instanceof byte[]) {
@@ -330,8 +330,8 @@ public class Put65 extends BaseCommand {
} else { // boolean replace(<K>, <V>, <V>) {
boolean didPut;
didPut = region.basicBridgeReplace(key, expectedOldValue, value, isObject, callbackArg,
- servConn.getProxyID(), true, clientEvent);
- if (msg.isRetry() && clientEvent.getVersionTag() != null) {
+ serverConnection.getProxyID(), true, clientEvent);
+ if (clientMessage.isRetry() && clientEvent.getVersionTag() != null) {
if (isDebugEnabled) {
logger.debug("replace(k,v,v) operation was successful last time with version {}",
clientEvent.getVersionTag());
@@ -356,9 +356,9 @@ public class Put65 extends BaseCommand {
// Create the null entry. Since the value is null, the value of the
// isObject
// the true after null doesn't matter and is not used.
- result = region.basicBridgeCreate(key, null, true, callbackArg, servConn.getProxyID(), true,
+ result = region.basicBridgeCreate(key, null, true, callbackArg, serverConnection.getProxyID(), true,
clientEvent, false);
- if (msg.isRetry() && clientEvent.isConcurrencyConflict()
+ if (clientMessage.isRetry() && clientEvent.isConcurrencyConflict()
&& clientEvent.getVersionTag() != null) {
result = true;
if (isDebugEnabled) {
@@ -372,16 +372,16 @@ public class Put65 extends BaseCommand {
if (isDelta) {
delta = valuePart.getSerializedForm();
}
- TXManagerImpl txMgr = (TXManagerImpl) servConn.getCache().getCacheTransactionManager();
+ TXManagerImpl txMgr = (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager();
// bug 43068 - use create() if in a transaction and op is CREATE
if (txMgr.getTXState() != null && operation.isCreate()) {
result = region.basicBridgeCreate(key, (byte[]) value, isObject, callbackArg,
- servConn.getProxyID(), true, clientEvent, true);
+ serverConnection.getProxyID(), true, clientEvent, true);
} else {
result = region.basicBridgePut(key, value, delta, isObject, callbackArg,
- servConn.getProxyID(), true, clientEvent);
+ serverConnection.getProxyID(), true, clientEvent);
}
- if (msg.isRetry() && clientEvent.isConcurrencyConflict()
+ if (clientMessage.isRetry() && clientEvent.isConcurrencyConflict()
&& clientEvent.getVersionTag() != null) {
if (isDebugEnabled) {
logger.debug("put(k,v) operation was successful last time with version {}",
@@ -391,46 +391,46 @@ public class Put65 extends BaseCommand {
}
}
if (result) {
- servConn.setModificationInfo(true, regionName, key);
+ serverConnection.setModificationInfo(true, regionName, key);
} else {
- String message = servConn.getName() + ": Failed to put entry for region " + regionName
- + " key " + key + " value " + valuePart;
+ String message = serverConnection.getName() + ": Failed to put entry for region " + regionName
+ + " key " + key + " value " + valuePart;
if (isDebugEnabled) {
logger.debug(message);
}
throw new Exception(message);
}
} catch (RegionDestroyedException rde) {
- writeException(msg, rde, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, rde, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
} catch (ResourceException re) {
- writeException(msg, re, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, re, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
} catch (InvalidDeltaException ide) {
logger.info(LocalizedMessage.create(
LocalizedStrings.UpdateOperation_ERROR_APPLYING_DELTA_FOR_KEY_0_OF_REGION_1,
new Object[] {key, regionName}));
- writeException(msg, MessageType.PUT_DELTA_ERROR, ide, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, MessageType.PUT_DELTA_ERROR, ide, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
region.getCachePerfStats().incDeltaFullValuesRequested();
return;
} catch (Exception ce) {
// If an interrupted exception is thrown , rethrow it
- checkForInterrupt(servConn, ce);
+ checkForInterrupt(serverConnection, ce);
// If an exception occurs during the put, preserve the connection
- writeException(msg, ce, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ce, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
if (ce instanceof GemFireSecurityException) {
// Fine logging for security exceptions since these are already
// logged by the security logger
if (isDebugEnabled) {
- logger.debug("{}: Unexpected Security exception", servConn.getName(), ce);
+ logger.debug("{}: Unexpected Security exception", serverConnection.getName(), ce);
}
} else if (isDebugEnabled) {
- logger.debug("{}: Unexpected Exception", servConn.getName(), ce);
+ logger.debug("{}: Unexpected Exception", serverConnection.getName(), ce);
}
return;
} finally {
@@ -443,21 +443,21 @@ public class Put65 extends BaseCommand {
if (region instanceof PartitionedRegion) {
PartitionedRegion pr = (PartitionedRegion) region;
if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) {
- writeReplyWithRefreshMetadata(msg, servConn, pr, sendOldValue, oldValueIsObject, oldValue,
+ writeReplyWithRefreshMetadata(clientMessage, serverConnection, pr, sendOldValue, oldValueIsObject, oldValue,
pr.getNetworkHopType(), clientEvent.getVersionTag());
pr.clearNetworkHopData();
} else {
- writeReply(msg, servConn, sendOldValue, oldValueIsObject, oldValue,
+ writeReply(clientMessage, serverConnection, sendOldValue, oldValueIsObject, oldValue,
clientEvent.getVersionTag());
}
} else {
- writeReply(msg, servConn, sendOldValue, oldValueIsObject, oldValue,
+ writeReply(clientMessage, serverConnection, sendOldValue, oldValueIsObject, oldValue,
clientEvent.getVersionTag());
}
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
if (isDebugEnabled) {
logger.debug("{}: Sent put response back to {} for region {} key {} value {}",
- servConn.getName(), servConn.getSocketString(), regionName, key, valuePart);
+ serverConnection.getName(), serverConnection.getSocketString(), regionName, key, valuePart);
}
stats.incWritePutResponseTime(DistributionStats.getStatTime() - start);
@@ -471,7 +471,7 @@ public class Put65 extends BaseCommand {
replyMsg.setMessageType(MessageType.REPLY);
replyMsg.setNumberOfParts(sendOldValue ? 3 : 1);
replyMsg.setTransactionId(origMsg.getTransactionId());
- replyMsg.addBytesPart(OK_BYTES);
+ replyMsg.addBytesPart(okBytes());
if (sendOldValue) {
replyMsg.addIntPart(oldValueIsObject ? 1 : 0);
replyMsg.addObjPart(oldValue);
@@ -499,7 +499,7 @@ public class Put65 extends BaseCommand {
replyMsg.send(servConn);
pr.getPrStats().incPRMetaDataSentCount();
if (logger.isTraceEnabled()) {
- logger.trace("{}: rpl with REFRESH_METADAT tx: {} parts={}", servConn.getName(),
+ logger.trace("{}: rpl with REFRESH_METADATA tx: {} parts={}", servConn.getName(),
origMsg.getTransactionId(), replyMsg.getNumberOfParts());
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put70.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put70.java
index 38eb7ef..395dbce 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put70.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put70.java
@@ -62,7 +62,7 @@ public class Put70 extends Put65 {
}
replyMsg.setNumberOfParts(parts);
replyMsg.setTransactionId(origMsg.getTransactionId());
- replyMsg.addBytesPart(OK_BYTES);
+ replyMsg.addBytesPart(okBytes());
replyMsg.addIntPart(flags);
if (sendOldValue) {
replyMsg.addObjPart(oldValue);
@@ -114,7 +114,7 @@ public class Put70 extends Put65 {
replyMsg.send(servConn);
pr.getPrStats().incPRMetaDataSentCount();
if (logger.isTraceEnabled()) {
- logger.trace("{}: rpl with REFRESH_METADAT tx: {} parts={}", servConn.getName(),
+ logger.trace("{}: rpl with REFRESH_METADATA tx: {} parts={}", servConn.getName(),
origMsg.getTransactionId(), replyMsg.getNumberOfParts());
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutAll.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutAll.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutAll.java
index 0bcfd1b..281f737 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutAll.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutAll.java
@@ -59,7 +59,7 @@ public class PutAll extends BaseCommand {
private PutAll() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, InterruptedException {
Part regionNamePart = null, numberOfKeysPart = null, keyPart = null, valuePart = null;
String regionName = null;
@@ -67,12 +67,12 @@ public class PutAll extends BaseCommand {
Object key = null;
Part eventPart = null;
StringBuffer errMessage = new StringBuffer();
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- CacheServerStats stats = servConn.getCacheServerStats();
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
boolean replyWithMetaData = false;
// requiresResponse = true;
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
{
long oldStart = start;
start = DistributionStats.getStatTime();
@@ -82,64 +82,64 @@ public class PutAll extends BaseCommand {
try {
// Retrieve the data from the message parts
// part 0: region name
- regionNamePart = msg.getPart(0);
+ regionNamePart = clientMessage.getPart(0);
regionName = regionNamePart.getString();
if (regionName == null) {
String putAllMsg =
LocalizedStrings.PutAll_THE_INPUT_REGION_NAME_FOR_THE_PUTALL_REQUEST_IS_NULL
.toLocalizedString();
- logger.warn("{}: {}", servConn.getName(), putAllMsg);
+ logger.warn("{}: {}", serverConnection.getName(), putAllMsg);
errMessage.append(putAllMsg);
- writeErrorResponse(msg, MessageType.PUT_DATA_ERROR, errMessage.toString(), servConn);
- servConn.setAsTrue(RESPONDED);
+ writeErrorResponse(clientMessage, MessageType.PUT_DATA_ERROR, errMessage.toString(), serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
LocalRegion region = (LocalRegion) crHelper.getRegion(regionName);
if (region == null) {
String reason = " was not found during put request";
- writeRegionDestroyedEx(msg, regionName, reason, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
// part 1: eventID
- eventPart = msg.getPart(1);
+ eventPart = clientMessage.getPart(1);
ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(eventPart.getSerializedForm());
long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
- EventID eventId = new EventID(servConn.getEventMemberIDByteArray(), threadId, sequenceId);
+ EventID eventId = new EventID(serverConnection.getEventMemberIDByteArray(), threadId, sequenceId);
// part 2: number of keys
- numberOfKeysPart = msg.getPart(2);
+ numberOfKeysPart = clientMessage.getPart(2);
numberOfKeys = numberOfKeysPart.getInt();
// building the map
Map map = new LinkedHashMap();
// Map isObjectMap = new LinkedHashMap();
for (int i = 0; i < numberOfKeys; i++) {
- keyPart = msg.getPart(3 + i * 2);
+ keyPart = clientMessage.getPart(3 + i * 2);
key = keyPart.getStringOrObject();
if (key == null) {
String putAllMsg =
LocalizedStrings.PutAll_ONE_OF_THE_INPUT_KEYS_FOR_THE_PUTALL_REQUEST_IS_NULL
.toLocalizedString();
- logger.warn("{}: {}", servConn.getName(), putAllMsg);
+ logger.warn("{}: {}", serverConnection.getName(), putAllMsg);
errMessage.append(putAllMsg);
- writeErrorResponse(msg, MessageType.PUT_DATA_ERROR, errMessage.toString(), servConn);
- servConn.setAsTrue(RESPONDED);
+ writeErrorResponse(clientMessage, MessageType.PUT_DATA_ERROR, errMessage.toString(), serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- valuePart = msg.getPart(3 + i * 2 + 1);
+ valuePart = clientMessage.getPart(3 + i * 2 + 1);
if (valuePart.isNull()) {
String putAllMsg =
LocalizedStrings.PutAll_ONE_OF_THE_INPUT_VALUES_FOR_THE_PUTALL_REQUEST_IS_NULL
.toLocalizedString();
- logger.warn("{}: {}", servConn.getName(), putAllMsg);
+ logger.warn("{}: {}", serverConnection.getName(), putAllMsg);
errMessage.append(putAllMsg);
- writeErrorResponse(msg, MessageType.PUT_DATA_ERROR, errMessage.toString(), servConn);
- servConn.setAsTrue(RESPONDED);
+ writeErrorResponse(clientMessage, MessageType.PUT_DATA_ERROR, errMessage.toString(), serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -155,15 +155,15 @@ public class PutAll extends BaseCommand {
// isObjectMap.put(key, new Boolean(isObject));
} // for
- if (msg.getNumberOfParts() == (3 + 2 * numberOfKeys + 1)) {// it means optional timeout has
+ if (clientMessage.getNumberOfParts() == (3 + 2 * numberOfKeys + 1)) {// it means optional timeout has
// been added
- int timeout = msg.getPart(3 + 2 * numberOfKeys).getInt();
- servConn.setRequestSpecificTimeout(timeout);
+ int timeout = clientMessage.getPart(3 + 2 * numberOfKeys).getInt();
+ serverConnection.setRequestSpecificTimeout(timeout);
}
this.securityService.authorizeRegionWrite(regionName);
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) {
authzRequest.createRegionAuthorize(regionName);
@@ -179,41 +179,41 @@ public class PutAll extends BaseCommand {
if (logger.isDebugEnabled()) {
logger.debug("{}: Received putAll request ({} bytes) from {} for region {}",
- servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName);
+ serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getSocketString(), regionName);
}
region.basicBridgePutAll(map, Collections.<Object, VersionTag>emptyMap(),
- servConn.getProxyID(), eventId, false, null);
+ serverConnection.getProxyID(), eventId, false, null);
if (region instanceof PartitionedRegion) {
PartitionedRegion pr = (PartitionedRegion) region;
if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) {
- writeReplyWithRefreshMetadata(msg, servConn, pr, pr.getNetworkHopType());
+ writeReplyWithRefreshMetadata(clientMessage, serverConnection, pr, pr.getNetworkHopType());
pr.clearNetworkHopData();
replyWithMetaData = true;
}
}
} catch (RegionDestroyedException rde) {
- writeException(msg, rde, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, rde, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
} catch (ResourceException re) {
- writeException(msg, re, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, re, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
} catch (PutAllPartialResultException pre) {
- writeException(msg, pre, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, pre, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
} catch (Exception ce) {
// If an interrupted exception is thrown , rethrow it
- checkForInterrupt(servConn, ce);
+ checkForInterrupt(serverConnection, ce);
// If an exception occurs during the put, preserve the connection
- writeException(msg, ce, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ce, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
logger.warn(LocalizedMessage.create(LocalizedStrings.Generic_0_UNEXPECTED_EXCEPTION,
- servConn.getName()), ce);
+ serverConnection.getName()), ce);
return;
} finally {
long oldStart = start;
@@ -223,12 +223,12 @@ public class PutAll extends BaseCommand {
// Increment statistics and write the reply
if (!replyWithMetaData) {
- writeReply(msg, servConn);
+ writeReply(clientMessage, serverConnection);
}
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
if (logger.isDebugEnabled()) {
- logger.debug("{}: Sent putAll response back to {} for region {}", servConn.getName(),
- servConn.getSocketString(), regionName);
+ logger.debug("{}: Sent putAll response back to {} for region {}", serverConnection.getName(),
+ serverConnection.getSocketString(), regionName);
}
stats.incWritePutAllResponseTime(DistributionStats.getStatTime() - start);
}
http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutAll70.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutAll70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutAll70.java
index c5fcbae..ae2de09 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutAll70.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutAll70.java
@@ -62,7 +62,7 @@ public class PutAll70 extends BaseCommand {
private PutAll70() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long startp)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long startp)
throws IOException, InterruptedException {
long start = startp; // copy this since we need to modify it
Part regionNamePart = null, numberOfKeysPart = null, keyPart = null, valuePart = null;
@@ -74,11 +74,11 @@ public class PutAll70 extends BaseCommand {
VersionedObjectList response = null;
StringBuffer errMessage = new StringBuffer();
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- CacheServerStats stats = servConn.getCacheServerStats();
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
// requiresResponse = true;
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
{
long oldStart = start;
start = DistributionStats.getStatTime();
@@ -88,40 +88,40 @@ public class PutAll70 extends BaseCommand {
try {
// Retrieve the data from the message parts
// part 0: region name
- regionNamePart = msg.getPart(0);
+ regionNamePart = clientMessage.getPart(0);
regionName = regionNamePart.getString();
if (regionName == null) {
String putAllMsg =
LocalizedStrings.PutAll_THE_INPUT_REGION_NAME_FOR_THE_PUTALL_REQUEST_IS_NULL
.toLocalizedString();
- logger.warn("{}: {}", servConn.getName(), putAllMsg);
+ logger.warn("{}: {}", serverConnection.getName(), putAllMsg);
errMessage.append(putAllMsg);
- writeErrorResponse(msg, MessageType.PUT_DATA_ERROR, errMessage.toString(), servConn);
- servConn.setAsTrue(RESPONDED);
+ writeErrorResponse(clientMessage, MessageType.PUT_DATA_ERROR, errMessage.toString(), serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
LocalRegion region = (LocalRegion) crHelper.getRegion(regionName);
if (region == null) {
String reason = " was not found during put request";
- writeRegionDestroyedEx(msg, regionName, reason, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
// part 1: eventID
- eventPart = msg.getPart(1);
+ eventPart = clientMessage.getPart(1);
ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(eventPart.getSerializedForm());
long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
- EventID eventId = new EventID(servConn.getEventMemberIDByteArray(), threadId, sequenceId);
+ EventID eventId = new EventID(serverConnection.getEventMemberIDByteArray(), threadId, sequenceId);
// part 2: invoke callbacks (used by import)
- Part callbacksPart = msg.getPart(2);
+ Part callbacksPart = clientMessage.getPart(2);
boolean skipCallbacks = callbacksPart.getInt() == 1 ? true : false;
// part 3: number of keys
- numberOfKeysPart = msg.getPart(3);
+ numberOfKeysPart = clientMessage.getPart(3);
numberOfKeys = numberOfKeysPart.getInt();
// building the map
@@ -129,28 +129,28 @@ public class PutAll70 extends BaseCommand {
Map<Object, VersionTag> retryVersions = new LinkedHashMap<Object, VersionTag>();
// Map isObjectMap = new LinkedHashMap();
for (int i = 0; i < numberOfKeys; i++) {
- keyPart = msg.getPart(4 + i * 2);
+ keyPart = clientMessage.getPart(4 + i * 2);
key = keyPart.getStringOrObject();
if (key == null) {
String putAllMsg =
LocalizedStrings.PutAll_ONE_OF_THE_INPUT_KEYS_FOR_THE_PUTALL_REQUEST_IS_NULL
.toLocalizedString();
- logger.warn("{}: {}", servConn.getName(), putAllMsg);
+ logger.warn("{}: {}", serverConnection.getName(), putAllMsg);
errMessage.append(putAllMsg);
- writeErrorResponse(msg, MessageType.PUT_DATA_ERROR, errMessage.toString(), servConn);
- servConn.setAsTrue(RESPONDED);
+ writeErrorResponse(clientMessage, MessageType.PUT_DATA_ERROR, errMessage.toString(), serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- valuePart = msg.getPart(4 + i * 2 + 1);
+ valuePart = clientMessage.getPart(4 + i * 2 + 1);
if (valuePart.isNull()) {
String putAllMsg =
LocalizedStrings.PutAll_ONE_OF_THE_INPUT_VALUES_FOR_THE_PUTALL_REQUEST_IS_NULL
.toLocalizedString();
- logger.warn("{}: {}", servConn.getName(), putAllMsg);
+ logger.warn("{}: {}", serverConnection.getName(), putAllMsg);
errMessage.append(putAllMsg);
- writeErrorResponse(msg, MessageType.PUT_DATA_ERROR, errMessage.toString(), servConn);
- servConn.setAsTrue(RESPONDED);
+ writeErrorResponse(clientMessage, MessageType.PUT_DATA_ERROR, errMessage.toString(), serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -170,7 +170,7 @@ public class PutAll70 extends BaseCommand {
value = valuePart.getSerializedForm();
}
// put serializedform for auth. It will be modified with auth callback
- if (msg.isRetry()) {
+ if (clientMessage.isRetry()) {
// Constuct the thread id/sequence id information for this element in the
// put all map
@@ -198,15 +198,15 @@ public class PutAll70 extends BaseCommand {
// isObjectMap.put(key, new Boolean(isObject));
} // for
- if (msg.getNumberOfParts() == (4 + 2 * numberOfKeys + 1)) {// it means optional timeout has
+ if (clientMessage.getNumberOfParts() == (4 + 2 * numberOfKeys + 1)) {// it means optional timeout has
// been added
- int timeout = msg.getPart(4 + 2 * numberOfKeys).getInt();
- servConn.setRequestSpecificTimeout(timeout);
+ int timeout = clientMessage.getPart(4 + 2 * numberOfKeys).getInt();
+ serverConnection.setRequestSpecificTimeout(timeout);
}
this.securityService.authorizeRegionWrite(regionName);
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) {
authzRequest.createRegionAuthorize(regionName);
@@ -231,10 +231,10 @@ public class PutAll70 extends BaseCommand {
if (logger.isDebugEnabled()) {
logger.debug("{}: Received putAll request ({} bytes) from {} for region {}",
- servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName);
+ serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getSocketString(), regionName);
}
- response = region.basicBridgePutAll(map, retryVersions, servConn.getProxyID(), eventId,
+ response = region.basicBridgePutAll(map, retryVersions, serverConnection.getProxyID(), eventId,
skipCallbacks, null);
if (!region.getConcurrencyChecksEnabled()) {
// the client only needs this if versioning is being used
@@ -244,33 +244,33 @@ public class PutAll70 extends BaseCommand {
if (region instanceof PartitionedRegion) {
PartitionedRegion pr = (PartitionedRegion) region;
if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) {
- writeReplyWithRefreshMetadata(msg, response, servConn, pr, pr.getNetworkHopType());
+ writeReplyWithRefreshMetadata(clientMessage, response, serverConnection, pr, pr.getNetworkHopType());
pr.clearNetworkHopData();
replyWithMetaData = true;
}
}
} catch (RegionDestroyedException rde) {
- writeException(msg, rde, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, rde, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
} catch (ResourceException re) {
- writeException(msg, re, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, re, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
} catch (PutAllPartialResultException pre) {
- writeException(msg, pre, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, pre, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
} catch (Exception ce) {
// If an interrupted exception is thrown , rethrow it
- checkForInterrupt(servConn, ce);
+ checkForInterrupt(serverConnection, ce);
// If an exception occurs during the put, preserve the connection
- writeException(msg, ce, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ce, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
// if (logger.fineEnabled()) {
logger.warn(LocalizedMessage.create(LocalizedStrings.Generic_0_UNEXPECTED_EXCEPTION,
- servConn.getName()), ce);
+ serverConnection.getName()), ce);
// }
return;
} finally {
@@ -279,11 +279,11 @@ public class PutAll70 extends BaseCommand {
stats.incProcessPutAllTime(start - oldStart);
}
if (logger.isDebugEnabled()) {
- logger.debug("{}: Sending putAll70 response back to {} for region {}: {}", servConn.getName(),
- servConn.getSocketString(), regionName, response);
+ logger.debug("{}: Sending putAll70 response back to {} for region {}: {}", serverConnection.getName(),
+ serverConnection.getSocketString(), regionName, response);
}
// Starting in 7.0.1 we do not send the keys back
- if (response != null && Version.GFE_70.compareTo(servConn.getClientVersion()) < 0) {
+ if (response != null && Version.GFE_70.compareTo(serverConnection.getClientVersion()) < 0) {
if (logger.isDebugEnabled()) {
logger.debug("setting putAll keys to null");
}
@@ -292,14 +292,14 @@ public class PutAll70 extends BaseCommand {
// Increment statistics and write the reply
if (!replyWithMetaData) {
- writeReply(msg, response, servConn);
+ writeReply(clientMessage, response, serverConnection);
}
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
stats.incWritePutAllResponseTime(DistributionStats.getStatTime() - start);
}
@Override
- protected void writeReply(Message origMsg, ServerConnection servConn) throws IOException {
+ protected void writeReply(Message origMsg, ServerConnection serverConnection) throws IOException {
throw new UnsupportedOperationException();
}
@@ -311,7 +311,7 @@ public class PutAll70 extends BaseCommand {
replyMsg.setMessageType(MessageType.REPLY);
replyMsg.setNumberOfParts(2);
replyMsg.setTransactionId(origMsg.getTransactionId());
- replyMsg.addBytesPart(OK_BYTES);
+ replyMsg.addBytesPart(okBytes());
if (response != null) {
response.clearObjects();
replyMsg.addObjPart(response);
@@ -323,7 +323,7 @@ public class PutAll70 extends BaseCommand {
}
@Override
- protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection servConn,
+ protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection serverConnection,
PartitionedRegion pr, byte nwHop) throws IOException {
throw new UnsupportedOperationException();
}
@@ -343,7 +343,7 @@ public class PutAll70 extends BaseCommand {
replyMsg.send(servConn);
pr.getPrStats().incPRMetaDataSentCount();
if (logger.isTraceEnabled()) {
- logger.trace("{}: rpl with REFRESH_METADAT tx: {}", servConn.getName(),
+ logger.trace("{}: rpl with REFRESH_METADATA tx: {}", servConn.getName(),
origMsg.getTransactionId());
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutAll80.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutAll80.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutAll80.java
index a6285ed..aed5926 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutAll80.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutAll80.java
@@ -75,7 +75,7 @@ public class PutAll80 extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long startp)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long startp)
throws IOException, InterruptedException {
long start = startp; // copy this since we need to modify it
Part regionNamePart = null, numberOfKeysPart = null, keyPart = null, valuePart = null;
@@ -87,12 +87,12 @@ public class PutAll80 extends BaseCommand {
VersionedObjectList response = null;
StringBuffer errMessage = new StringBuffer();
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- CacheServerStats stats = servConn.getCacheServerStats();
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
// requiresResponse = true;
- servConn.setAsTrue(REQUIRES_RESPONSE);
- servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); // new in 8.0
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); // new in 8.0
{
long oldStart = start;
start = DistributionStats.getStatTime();
@@ -102,60 +102,60 @@ public class PutAll80 extends BaseCommand {
try {
// Retrieve the data from the message parts
// part 0: region name
- regionNamePart = msg.getPart(0);
+ regionNamePart = clientMessage.getPart(0);
regionName = regionNamePart.getString();
if (regionName == null) {
String putAllMsg =
LocalizedStrings.PutAll_THE_INPUT_REGION_NAME_FOR_THE_PUTALL_REQUEST_IS_NULL
.toLocalizedString();
- logger.warn("{}: {}", servConn.getName(), putAllMsg);
+ logger.warn("{}: {}", serverConnection.getName(), putAllMsg);
errMessage.append(putAllMsg);
- writeChunkedErrorResponse(msg, MessageType.PUT_DATA_ERROR, errMessage.toString(), servConn);
- servConn.setAsTrue(RESPONDED);
+ writeChunkedErrorResponse(clientMessage, MessageType.PUT_DATA_ERROR, errMessage.toString(), serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
LocalRegion region = (LocalRegion) crHelper.getRegion(regionName);
if (region == null) {
String reason = " was not found during putAll request";
- writeRegionDestroyedEx(msg, regionName, reason, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
final int BASE_PART_COUNT = getBasePartCount();
// part 1: eventID
- eventPart = msg.getPart(1);
+ eventPart = clientMessage.getPart(1);
ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(eventPart.getSerializedForm());
long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
- EventID eventId = new EventID(servConn.getEventMemberIDByteArray(), threadId, sequenceId);
+ EventID eventId = new EventID(serverConnection.getEventMemberIDByteArray(), threadId, sequenceId);
Breadcrumbs.setEventId(eventId);
// part 2: invoke callbacks (used by import)
- Part callbacksPart = msg.getPart(2);
+ Part callbacksPart = clientMessage.getPart(2);
boolean skipCallbacks = callbacksPart.getInt() == 1 ? true : false;
// part 3: flags
- int flags = msg.getPart(3).getInt();
+ int flags = clientMessage.getPart(3).getInt();
boolean clientIsEmpty = (flags & PutAllOp.FLAG_EMPTY) != 0;
boolean clientHasCCEnabled = (flags & PutAllOp.FLAG_CONCURRENCY_CHECKS) != 0;
// part 4: number of keys
- numberOfKeysPart = msg.getPart(4);
+ numberOfKeysPart = clientMessage.getPart(4);
numberOfKeys = numberOfKeysPart.getInt();
- Object callbackArg = getOptionalCallbackArg(msg);
+ Object callbackArg = getOptionalCallbackArg(clientMessage);
if (logger.isDebugEnabled()) {
StringBuilder buffer = new StringBuilder();
- buffer.append(servConn.getName()).append(": Received ").append(this.putAllClassName())
- .append(" request from ").append(servConn.getSocketString()).append(" for region ")
- .append(regionName).append(callbackArg != null ? (" callbackArg " + callbackArg) : "")
- .append(" with ").append(numberOfKeys).append(" entries.");
+ buffer.append(serverConnection.getName()).append(": Received ").append(this.putAllClassName())
+ .append(" request from ").append(serverConnection.getSocketString()).append(" for region ")
+ .append(regionName).append(callbackArg != null ? (" callbackArg " + callbackArg) : "")
+ .append(" with ").append(numberOfKeys).append(" entries.");
logger.debug(buffer.toString());
}
// building the map
@@ -163,30 +163,28 @@ public class PutAll80 extends BaseCommand {
Map<Object, VersionTag> retryVersions = new LinkedHashMap<Object, VersionTag>();
// Map isObjectMap = new LinkedHashMap();
for (int i = 0; i < numberOfKeys; i++) {
- keyPart = msg.getPart(BASE_PART_COUNT + i * 2);
+ keyPart = clientMessage.getPart(BASE_PART_COUNT + i * 2);
key = keyPart.getStringOrObject();
if (key == null) {
String putAllMsg =
LocalizedStrings.PutAll_ONE_OF_THE_INPUT_KEYS_FOR_THE_PUTALL_REQUEST_IS_NULL
.toLocalizedString();
- logger.warn("{}: {}", servConn.getName(), putAllMsg);
+ logger.warn("{}: {}", serverConnection.getName(), putAllMsg);
errMessage.append(putAllMsg);
- writeChunkedErrorResponse(msg, MessageType.PUT_DATA_ERROR, errMessage.toString(),
- servConn);
- servConn.setAsTrue(RESPONDED);
+ writeChunkedErrorResponse(clientMessage, MessageType.PUT_DATA_ERROR, errMessage.toString(), serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- valuePart = msg.getPart(BASE_PART_COUNT + i * 2 + 1);
+ valuePart = clientMessage.getPart(BASE_PART_COUNT + i * 2 + 1);
if (valuePart.isNull()) {
String putAllMsg =
LocalizedStrings.PutAll_ONE_OF_THE_INPUT_VALUES_FOR_THE_PUTALL_REQUEST_IS_NULL
.toLocalizedString();
- logger.warn("{}: {}", servConn.getName(), putAllMsg);
+ logger.warn("{}: {}", serverConnection.getName(), putAllMsg);
errMessage.append(putAllMsg);
- writeChunkedErrorResponse(msg, MessageType.PUT_DATA_ERROR, errMessage.toString(),
- servConn);
- servConn.setAsTrue(RESPONDED);
+ writeChunkedErrorResponse(clientMessage, MessageType.PUT_DATA_ERROR, errMessage.toString(), serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -206,7 +204,7 @@ public class PutAll80 extends BaseCommand {
value = valuePart.getSerializedForm();
}
// put serializedform for auth. It will be modified with auth callback
- if (msg.isRetry()) {
+ if (clientMessage.isRetry()) {
// Constuct the thread id/sequence id information for this element in the
// put all map
@@ -234,16 +232,16 @@ public class PutAll80 extends BaseCommand {
// isObjectMap.put(key, new Boolean(isObject));
} // for
- if (msg.getNumberOfParts() == (BASE_PART_COUNT + 2 * numberOfKeys + 1)) {// it means optional
+ if (clientMessage.getNumberOfParts() == (BASE_PART_COUNT + 2 * numberOfKeys + 1)) {// it means optional
// timeout has been
// added
- int timeout = msg.getPart(BASE_PART_COUNT + 2 * numberOfKeys).getInt();
- servConn.setRequestSpecificTimeout(timeout);
+ int timeout = clientMessage.getPart(BASE_PART_COUNT + 2 * numberOfKeys).getInt();
+ serverConnection.setRequestSpecificTimeout(timeout);
}
this.securityService.authorizeRegionWrite(regionName);
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) {
authzRequest.createRegionAuthorize(regionName);
@@ -267,7 +265,7 @@ public class PutAll80 extends BaseCommand {
*/
}
- response = region.basicBridgePutAll(map, retryVersions, servConn.getProxyID(), eventId,
+ response = region.basicBridgePutAll(map, retryVersions, serverConnection.getProxyID(), eventId,
skipCallbacks, callbackArg);
if (!region.getConcurrencyChecksEnabled() || clientIsEmpty || !clientHasCCEnabled) {
// the client only needs this if versioning is being used and the client
@@ -283,32 +281,32 @@ public class PutAll80 extends BaseCommand {
if (region instanceof PartitionedRegion) {
PartitionedRegion pr = (PartitionedRegion) region;
if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) {
- writeReplyWithRefreshMetadata(msg, response, servConn, pr, pr.getNetworkHopType());
+ writeReplyWithRefreshMetadata(clientMessage, response, serverConnection, pr, pr.getNetworkHopType());
pr.clearNetworkHopData();
replyWithMetaData = true;
}
}
} catch (RegionDestroyedException rde) {
- writeChunkedException(msg, rde, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeChunkedException(clientMessage, rde, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
} catch (ResourceException re) {
- writeChunkedException(msg, re, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeChunkedException(clientMessage, re, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
} catch (PutAllPartialResultException pre) {
- writeChunkedException(msg, pre, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeChunkedException(clientMessage, pre, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
} catch (Exception ce) {
// If an interrupted exception is thrown , rethrow it
- checkForInterrupt(servConn, ce);
+ checkForInterrupt(serverConnection, ce);
// If an exception occurs during the put, preserve the connection
- writeChunkedException(msg, ce, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeChunkedException(clientMessage, ce, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
logger.warn(LocalizedMessage.create(LocalizedStrings.Generic_0_UNEXPECTED_EXCEPTION,
- servConn.getName()), ce);
+ serverConnection.getName()), ce);
return;
} finally {
long oldStart = start;
@@ -316,21 +314,21 @@ public class PutAll80 extends BaseCommand {
stats.incProcessPutAllTime(start - oldStart);
}
if (logger.isDebugEnabled()) {
- logger.debug("{}: Sending {} response back to {} for regin {} {}", servConn.getName(),
- putAllClassName(), servConn.getSocketString(), regionName,
+ logger.debug("{}: Sending {} response back to {} for regin {} {}", serverConnection.getName(),
+ putAllClassName(), serverConnection.getSocketString(), regionName,
(logger.isTraceEnabled() ? ": " + response : ""));
}
// Increment statistics and write the reply
if (!replyWithMetaData) {
- writeReply(msg, response, servConn);
+ writeReply(clientMessage, response, serverConnection);
}
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
stats.incWritePutAllResponseTime(DistributionStats.getStatTime() - start);
}
@Override
- protected void writeReply(Message origMsg, ServerConnection servConn) throws IOException {
+ protected void writeReply(Message origMsg, ServerConnection serverConnection) throws IOException {
throw new UnsupportedOperationException();
}
@@ -351,7 +349,7 @@ public class PutAll80 extends BaseCommand {
}
replyMsg.sendHeader();
if (listSize > 0) {
- int chunkSize = 2 * maximumChunkSize;
+ int chunkSize = 2 * MAXIMUM_CHUNK_SIZE;
// Chunker will stream over the list in its toData method
VersionedObjectList.Chunker chunk =
new VersionedObjectList.Chunker(response, chunkSize, false, false);
@@ -383,7 +381,7 @@ public class PutAll80 extends BaseCommand {
}
@Override
- protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection servConn,
+ protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection serverConnection,
PartitionedRegion pr, byte nwHop) throws IOException {
throw new UnsupportedOperationException();
}
@@ -411,7 +409,7 @@ public class PutAll80 extends BaseCommand {
replyMsg.setLastChunk(false);
replyMsg.sendChunk(servConn);
- int chunkSize = 2 * maximumChunkSize; // maximumChunkSize
+ int chunkSize = 2 * MAXIMUM_CHUNK_SIZE; // MAXIMUM_CHUNK_SIZE
// Chunker will stream over the list in its toData method
VersionedObjectList.Chunker chunk =
new VersionedObjectList.Chunker(response, chunkSize, false, false);
@@ -437,7 +435,7 @@ public class PutAll80 extends BaseCommand {
}
pr.getPrStats().incPRMetaDataSentCount();
if (logger.isTraceEnabled()) {
- logger.trace("{}: rpl with REFRESH_METADAT tx: {}", servConn.getName(),
+ logger.trace("{}: rpl with REFRESH_METADATA tx: {}", servConn.getName(),
origMsg.getTransactionId());
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutUserCredentials.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutUserCredentials.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutUserCredentials.java
index 198eed6..dc3de67 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutUserCredentials.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutUserCredentials.java
@@ -32,39 +32,39 @@ public class PutUserCredentials extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, ClassNotFoundException, InterruptedException {
- boolean isSecureMode = msg.isSecureMode();
+ boolean isSecureMode = clientMessage.isSecureMode();
// if (!isSecureMode)
// client has not send secuirty header, need to send exception and log this in security (file)
if (isSecureMode) {
- int numberOfParts = msg.getNumberOfParts();
+ int numberOfParts = clientMessage.getNumberOfParts();
if (numberOfParts == 1) {
// need to get credentials
try {
- servConn.setAsTrue(REQUIRES_RESPONSE);
- byte[] uniqueId = servConn.setCredentials(msg);
- writeResponse(uniqueId, null, msg, false, servConn);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
+ byte[] uniqueId = serverConnection.setCredentials(clientMessage);
+ writeResponse(uniqueId, null, clientMessage, false, serverConnection);
} catch (GemFireSecurityException gfse) {
- if (servConn.getSecurityLogWriter().warningEnabled()) {
- servConn.getSecurityLogWriter().warning(LocalizedStrings.ONE_ARG, servConn.getName()
- + ": Security exception: " + gfse.toString()
- + (gfse.getCause() != null ? ", caused by: " + gfse.getCause().toString() : ""));
+ if (serverConnection.getSecurityLogWriter().warningEnabled()) {
+ serverConnection.getSecurityLogWriter().warning(LocalizedStrings.ONE_ARG, serverConnection.getName()
+ + ": Security exception: " + gfse.toString()
+ + (gfse.getCause() != null ? ", caused by: " + gfse.getCause().toString() : ""));
}
- writeException(msg, gfse, false, servConn);
+ writeException(clientMessage, gfse, false, serverConnection);
} catch (Exception ex) {
- if (servConn.getLogWriter().warningEnabled()) {
- servConn.getLogWriter().warning(
+ if (serverConnection.getLogWriter().warningEnabled()) {
+ serverConnection.getLogWriter().warning(
LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1,
- new Object[] {servConn.getProxyID(), ""}, ex);
+ new Object[] { serverConnection.getProxyID(), ""}, ex);
}
- writeException(msg, ex, false, servConn);
+ writeException(clientMessage, ex, false, serverConnection);
} finally {
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
}
} else {
http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query.java
index d3c0393..8b5b35e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query.java
@@ -43,38 +43,38 @@ public class Query extends BaseCommandQuery {
protected Query() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, InterruptedException {
// Based on MessageType.DESTROY
// Added by gregp 10/18/05
- servConn.setAsTrue(REQUIRES_RESPONSE);
- servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
// Retrieve the data from the message parts
- String queryString = msg.getPart(0).getString();
+ String queryString = clientMessage.getPart(0).getString();
// this is optional part for message specific timeout, which right now send by native client
// need to take care while adding new message
- if (msg.getNumberOfParts() == 3) {
- int timeout = msg.getPart(2).getInt();
- servConn.setRequestSpecificTimeout(timeout);
+ if (clientMessage.getNumberOfParts() == 3) {
+ int timeout = clientMessage.getPart(2).getInt();
+ serverConnection.setRequestSpecificTimeout(timeout);
}
if (logger.isDebugEnabled()) {
- logger.debug("{}: Received query request from {} queryString: {}", servConn.getName(),
- servConn.getSocketString(), queryString);
+ logger.debug("{}: Received query request from {} queryString: {}", serverConnection.getName(),
+ serverConnection.getSocketString(), queryString);
}
try {
// Create query
QueryService queryService =
- servConn.getCachedRegionHelper().getCache().getLocalQueryService();
+ serverConnection.getCachedRegionHelper().getCache().getLocalQueryService();
org.apache.geode.cache.query.Query query = queryService.newQuery(queryString);
Set regionNames = ((DefaultQuery) query).getRegionsInQuery(null);
// Authorization check
QueryOperationContext queryContext = null;
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
queryContext = authzRequest.queryAuthorize(queryString, regionNames);
String newQueryString = queryContext.getQuery();
@@ -88,11 +88,11 @@ public class Query extends BaseCommandQuery {
}
}
- processQuery(msg, query, queryString, regionNames, start, null, queryContext, servConn, true);
+ processQuery(clientMessage, query, queryString, regionNames, start, null, queryContext, serverConnection, true);
} catch (QueryInvalidException e) {
throw new QueryInvalidException(e.getMessage() + queryString);
} catch (QueryExecutionLowMemoryException e) {
- writeQueryResponseException(msg, e, false, servConn);
+ writeQueryResponseException(clientMessage, e, serverConnection);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query651.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query651.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query651.java
index 5849431..97f5d56 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query651.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query651.java
@@ -44,40 +44,40 @@ public class Query651 extends BaseCommandQuery {
protected Query651() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, InterruptedException {
// Based on MessageType.DESTROY
// Added by gregp 10/18/05
- servConn.setAsTrue(REQUIRES_RESPONSE);
- servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
// Retrieve the data from the message parts
- String queryString = msg.getPart(0).getString();
+ String queryString = clientMessage.getPart(0).getString();
long compiledQueryId = 0;
Object[] queryParams = null;
try {
- if (msg.getMessageType() == MessageType.QUERY_WITH_PARAMETERS) {
+ if (clientMessage.getMessageType() == MessageType.QUERY_WITH_PARAMETERS) {
// Query with parameters supported from 6.6 onwards.
- int params = msg.getPart(1).getInt(); // Number of parameters.
+ int params = clientMessage.getPart(1).getInt(); // Number of parameters.
// In case of native client there will be extra two parameters at 2 and 3 index.
int paramStartIndex = 2;
- if (msg.getNumberOfParts() > (1 /* type */ + 1 /* query string */ + 1 /* params length */
- + params /* number of params */)) {
- int timeout = msg.getPart(3).getInt();
- servConn.setRequestSpecificTimeout(timeout);
+ if (clientMessage.getNumberOfParts() > (1 /* type */ + 1 /* query string */ + 1 /* params length */
+ + params /* number of params */)) {
+ int timeout = clientMessage.getPart(3).getInt();
+ serverConnection.setRequestSpecificTimeout(timeout);
paramStartIndex = 4;
}
// Get the query execution parameters.
queryParams = new Object[params];
for (int i = 0; i < queryParams.length; i++) {
- queryParams[i] = msg.getPart(i + paramStartIndex).getObject();
+ queryParams[i] = clientMessage.getPart(i + paramStartIndex).getObject();
}
} else {
// this is optional part for message specific timeout, which right now send by native client
// need to take care while adding new message
- if (msg.getNumberOfParts() == 3) {
- int timeout = msg.getPart(2).getInt();
- servConn.setRequestSpecificTimeout(timeout);
+ if (clientMessage.getNumberOfParts() == 3) {
+ int timeout = clientMessage.getPart(2).getInt();
+ serverConnection.setRequestSpecificTimeout(timeout);
}
}
} catch (ClassNotFoundException cne) {
@@ -85,19 +85,19 @@ public class Query651 extends BaseCommandQuery {
}
if (logger.isDebugEnabled()) {
- logger.debug("{}: Received query request from {} queryString: {}{}", servConn.getName(),
- servConn.getSocketString(), queryString,
+ logger.debug("{}: Received query request from {} queryString: {}{}", serverConnection.getName(),
+ serverConnection.getSocketString(), queryString,
(queryParams != null ? (" with num query parameters :" + queryParams.length) : ""));
}
try {
// Create query
QueryService queryService =
- servConn.getCachedRegionHelper().getCache().getLocalQueryService();
+ serverConnection.getCachedRegionHelper().getCache().getLocalQueryService();
org.apache.geode.cache.query.Query query = null;
if (queryParams != null) {
// Its a compiled query.
- CacheClientNotifier ccn = servConn.getAcceptor().getCacheClientNotifier();
+ CacheClientNotifier ccn = serverConnection.getAcceptor().getCacheClientNotifier();
query = ccn.getCompiledQuery(queryString);
if (query == null) {
// This is first time the query is seen by this server.
@@ -114,7 +114,7 @@ public class Query651 extends BaseCommandQuery {
// Authorization check
QueryOperationContext queryContext = null;
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
queryContext = authzRequest.queryAuthorize(queryString, regionNames, queryParams);
String newQueryString = queryContext.getQuery();
@@ -128,8 +128,7 @@ public class Query651 extends BaseCommandQuery {
}
}
- processQueryUsingParams(msg, query, queryString, regionNames, start, null, queryContext,
- servConn, true, queryParams);
+ processQueryUsingParams(clientMessage, query, queryString, regionNames, start, null, queryContext, serverConnection, true, queryParams);
} catch (QueryInvalidException e) {
throw new QueryInvalidException(e.getMessage() + queryString);
}
http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterDataSerializers.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterDataSerializers.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterDataSerializers.java
index 7d28d52..d1c101f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterDataSerializers.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterDataSerializers.java
@@ -37,22 +37,22 @@ public class RegisterDataSerializers extends BaseCommand {
private RegisterDataSerializers() {}
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, ClassNotFoundException {
if (logger.isDebugEnabled()) {
logger.debug("{}: Received register dataserializer request ({} parts) from {}",
- servConn.getName(), msg.getNumberOfParts(), servConn.getSocketString());
+ serverConnection.getName(), clientMessage.getNumberOfParts(), serverConnection.getSocketString());
}
- int noOfParts = msg.getNumberOfParts();
+ int noOfParts = clientMessage.getNumberOfParts();
// 2 parts per instantiator and one eventId part
int noOfDataSerializers = (noOfParts - 1) / 2;
// retrieve eventID from the last Part
- ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(msg.getPart(noOfParts - 1).getSerializedForm());
+ ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(clientMessage.getPart(noOfParts - 1).getSerializedForm());
long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
- EventID eventId = new EventID(servConn.getEventMemberIDByteArray(), threadId, sequenceId);
+ EventID eventId = new EventID(serverConnection.getEventMemberIDByteArray(), threadId, sequenceId);
byte[][] serializedDataSerializers = new byte[noOfDataSerializers * 2][];
boolean caughtCNFE = false;
@@ -60,12 +60,12 @@ public class RegisterDataSerializers extends BaseCommand {
try {
for (int i = 0; i < noOfParts - 1; i = i + 2) {
- Part dataSerializerClassNamePart = msg.getPart(i);
+ Part dataSerializerClassNamePart = clientMessage.getPart(i);
serializedDataSerializers[i] = dataSerializerClassNamePart.getSerializedForm();
String dataSerializerClassName =
(String) CacheServerHelper.deserialize(serializedDataSerializers[i]);
- Part idPart = msg.getPart(i + 1);
+ Part idPart = clientMessage.getPart(i + 1);
serializedDataSerializers[i + 1] = idPart.getSerializedForm();
int id = idPart.getInt();
@@ -73,7 +73,7 @@ public class RegisterDataSerializers extends BaseCommand {
try {
dataSerializerClass = InternalDataSerializer.getCachedClass(dataSerializerClassName);
InternalDataSerializer.register(dataSerializerClass, true, eventId,
- servConn.getProxyID());
+ serverConnection.getProxyID());
} catch (ClassNotFoundException e) {
// If a ClassNotFoundException is caught, store it, but continue
// processing other instantiators
@@ -82,26 +82,26 @@ public class RegisterDataSerializers extends BaseCommand {
}
}
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
}
// If a ClassNotFoundException was caught while processing the
// instantiators, send it back to the client. Note: This only sends
// the last CNFE.
if (caughtCNFE) {
- writeException(msg, cnfe, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, cnfe, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
}
// Send reply to client if necessary. If an exception occurs in the above
// code, then the reply has already been sent.
- if (!servConn.getTransientFlag(RESPONDED)) {
- writeReply(msg, servConn);
+ if (!serverConnection.getTransientFlag(RESPONDED)) {
+ writeReply(clientMessage, serverConnection);
}
if (logger.isDebugEnabled()) {
- logger.debug("Registered dataserializer for MembershipId = {}", servConn.getMembershipID());
+ logger.debug("Registered dataserializer for MembershipId = {}", serverConnection.getMembershipID());
}
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInstantiators.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInstantiators.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInstantiators.java
index 1e701fc..2b63337 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInstantiators.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInstantiators.java
@@ -49,23 +49,23 @@ public class RegisterInstantiators extends BaseCommand {
private RegisterInstantiators() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, ClassNotFoundException {
if (logger.isDebugEnabled()) {
logger.debug("{}: Received register instantiator request ({} parts) from {}",
- servConn.getName(), msg.getNumberOfParts(), servConn.getSocketString());
+ serverConnection.getName(), clientMessage.getNumberOfParts(), serverConnection.getSocketString());
}
- int noOfParts = msg.getNumberOfParts();
+ int noOfParts = clientMessage.getNumberOfParts();
// Assert parts
Assert.assertTrue((noOfParts - 1) % 3 == 0);
// 3 parts per instantiator and one eventId part
int noOfInstantiators = (noOfParts - 1) / 3;
// retrieve eventID from the last Part
- ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(msg.getPart(noOfParts - 1).getSerializedForm());
+ ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(clientMessage.getPart(noOfParts - 1).getSerializedForm());
long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
- EventID eventId = new EventID(servConn.getEventMemberIDByteArray(), threadId, sequenceId);
+ EventID eventId = new EventID(serverConnection.getEventMemberIDByteArray(), threadId, sequenceId);
byte[][] serializedInstantiators = new byte[noOfInstantiators * 3][];
boolean caughtCNFE = false;
@@ -73,17 +73,17 @@ public class RegisterInstantiators extends BaseCommand {
try {
for (int i = 0; i < noOfParts - 1; i = i + 3) {
- Part instantiatorPart = msg.getPart(i);
+ Part instantiatorPart = clientMessage.getPart(i);
serializedInstantiators[i] = instantiatorPart.getSerializedForm();
String instantiatorClassName =
(String) CacheServerHelper.deserialize(serializedInstantiators[i]);
- Part instantiatedPart = msg.getPart(i + 1);
+ Part instantiatedPart = clientMessage.getPart(i + 1);
serializedInstantiators[i + 1] = instantiatedPart.getSerializedForm();
String instantiatedClassName =
(String) CacheServerHelper.deserialize(serializedInstantiators[i + 1]);
- Part idPart = msg.getPart(i + 2);
+ Part idPart = clientMessage.getPart(i + 2);
serializedInstantiators[i + 2] = idPart.getSerializedForm();
int id = idPart.getInt();
@@ -92,7 +92,7 @@ public class RegisterInstantiators extends BaseCommand {
instantiatorClass = InternalDataSerializer.getCachedClass(instantiatorClassName);
instantiatedClass = InternalDataSerializer.getCachedClass(instantiatedClassName);
InternalInstantiator.register(instantiatorClass, instantiatedClass, id, true, eventId,
- servConn.getProxyID());
+ serverConnection.getProxyID());
} catch (ClassNotFoundException e) {
// If a ClassNotFoundException is caught, store it, but continue
// processing other instantiators
@@ -102,17 +102,17 @@ public class RegisterInstantiators extends BaseCommand {
}
} catch (Exception e) {
logger.warn(LocalizedMessage.create(LocalizedStrings.RegisterInstantiators_BAD_CLIENT,
- new Object[] {servConn.getMembershipID(), e.getLocalizedMessage()}));
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ new Object[] { serverConnection.getMembershipID(), e.getLocalizedMessage()}));
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
}
// If a ClassNotFoundException was caught while processing the
// instantiators, send it back to the client. Note: This only sends
// the last CNFE.
if (caughtCNFE) {
- writeException(msg, cnfe, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, cnfe, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
// Send the instantiators on to other clients if we hit an error
// due to a missing class, because they were not distributed
@@ -120,7 +120,7 @@ public class RegisterInstantiators extends BaseCommand {
// been distributed if successfully registered.
ClientInstantiatorMessage clientInstantiatorMessage =
new ClientInstantiatorMessage(EnumListenerEvent.AFTER_REGISTER_INSTANTIATOR,
- serializedInstantiators, servConn.getProxyID(), eventId);
+ serializedInstantiators, serverConnection.getProxyID(), eventId);
// Notify other clients
CacheClientNotifier.routeClientMessage(clientInstantiatorMessage);
@@ -129,12 +129,12 @@ public class RegisterInstantiators extends BaseCommand {
// Send reply to client if necessary. If an exception occurs in the above
// code, then the reply has already been sent.
- if (!servConn.getTransientFlag(RESPONDED)) {
- writeReply(msg, servConn);
+ if (!serverConnection.getTransientFlag(RESPONDED)) {
+ writeReply(clientMessage, serverConnection);
}
if (logger.isDebugEnabled()) {
- logger.debug("Registered instantiators for MembershipId = {}", servConn.getMembershipID());
+ logger.debug("Registered instantiators for MembershipId = {}", serverConnection.getMembershipID());
}
}