You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/31 23:15:24 UTC
[25/35] geode git commit: GEODE-2632: refactoring preparations for
SecurityService and BaseCommand changes
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java
index 0ed7235..674082c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java
@@ -64,7 +64,8 @@ public class ExecuteRegionFunction66 extends BaseCommand {
private ExecuteRegionFunction66() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
+ public void cmdExecute(Message clientMessage, ServerConnection servConn, long start)
+ throws IOException {
String regionName = null;
Object function = null;
Object args = null;
@@ -80,7 +81,7 @@ public class ExecuteRegionFunction66 extends BaseCommand {
byte functionState = 0;
int functionTimeout = ConnectionImpl.DEFAULT_CLIENT_FUNCTION_TIMEOUT;
try {
- byte[] bytes = msg.getPart(0).getSerializedForm();
+ byte[] bytes = clientMessage.getPart(0).getSerializedForm();
functionState = bytes[0];
if (bytes.length >= 5
&& servConn.getClientVersion().ordinal() >= Version.GFE_8009.ordinal()) {
@@ -95,17 +96,17 @@ public class ExecuteRegionFunction66 extends BaseCommand {
servConn.setAsTrue(REQUIRES_RESPONSE);
servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
}
- regionName = msg.getPart(1).getString();
- function = msg.getPart(2).getStringOrObject();
- args = msg.getPart(3).getObject();
- Part part = msg.getPart(4);
+ regionName = clientMessage.getPart(1).getString();
+ function = clientMessage.getPart(2).getStringOrObject();
+ args = clientMessage.getPart(3).getObject();
+ Part part = clientMessage.getPart(4);
if (part != null) {
Object obj = part.getObject();
if (obj instanceof MemberMappedArgument) {
memberMappedArg = (MemberMappedArgument) obj;
}
}
- byte[] flags = msg.getPart(5).getSerializedForm();
+ byte[] flags = clientMessage.getPart(5).getSerializedForm();
if (servConn.getClientVersion().ordinal() > Version.GFE_81.ordinal()) {
isBucketsAsFilter = (flags[0] & ExecuteFunctionHelper.BUCKETS_AS_FILTER_MASK) != 0;
isReExecute = (flags[0] & ExecuteFunctionHelper.IS_REXECUTE_MASK) != 0 ? (byte) 1 : 0;
@@ -113,24 +114,24 @@ public class ExecuteRegionFunction66 extends BaseCommand {
isReExecute = flags[0];
isBucketsAsFilter = false;
}
- filterSize = msg.getPart(6).getInt();
+ filterSize = clientMessage.getPart(6).getInt();
if (filterSize != 0) {
filter = new HashSet<Object>();
partNumber = 7;
for (int i = 0; i < filterSize; i++) {
- filter.add(msg.getPart(partNumber + i).getStringOrObject());
+ filter.add(clientMessage.getPart(partNumber + i).getStringOrObject());
}
}
partNumber = 7 + filterSize;
- removedNodesSize = msg.getPart(partNumber).getInt();
+ removedNodesSize = clientMessage.getPart(partNumber).getInt();
if (removedNodesSize != 0) {
removedNodesSet = new HashSet<Object>();
partNumber = partNumber + 1;
for (int i = 0; i < removedNodesSize; i++) {
- removedNodesSet.add(msg.getPart(partNumber + i).getStringOrObject());
+ removedNodesSet.add(clientMessage.getPart(partNumber + i).getStringOrObject());
}
}
@@ -139,9 +140,9 @@ public class ExecuteRegionFunction66 extends BaseCommand {
LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
function), exception);
if (hasResult == 1) {
- writeChunkedException(msg, exception, false, servConn);
+ writeChunkedException(clientMessage, exception, servConn);
} else {
- writeException(msg, exception, false, servConn);
+ writeException(clientMessage, exception, false, servConn);
}
servConn.setAsTrue(RESPONDED);
return;
@@ -159,7 +160,7 @@ public class ExecuteRegionFunction66 extends BaseCommand {
.toLocalizedString("region");
}
logger.warn("{}: {}", servConn.getName(), message);
- sendError(hasResult, msg, message, servConn);
+ sendError(hasResult, clientMessage, message, servConn);
return;
}
@@ -169,7 +170,7 @@ public class ExecuteRegionFunction66 extends BaseCommand {
LocalizedStrings.ExecuteRegionFunction_THE_REGION_NAMED_0_WAS_NOT_FOUND_DURING_EXECUTE_FUNCTION_REQUEST
.toLocalizedString(regionName);
logger.warn("{}: {}", servConn.getName(), message);
- sendError(hasResult, msg, message, servConn);
+ sendError(hasResult, clientMessage, message, servConn);
return;
}
HandShake handShake = (HandShake) servConn.getHandshake();
@@ -185,7 +186,7 @@ public class ExecuteRegionFunction66 extends BaseCommand {
LocalizedStrings.ExecuteRegionFunction_THE_FUNCTION_0_HAS_NOT_BEEN_REGISTERED
.toLocalizedString(function);
logger.warn("{}: {}", servConn.getName(), message);
- sendError(hasResult, msg, message, servConn);
+ sendError(hasResult, clientMessage, message, servConn);
return;
} else {
byte functionStateOnServerSide = AbstractExecution.getFunctionState(functionObject.isHA(),
@@ -199,7 +200,7 @@ public class ExecuteRegionFunction66 extends BaseCommand {
LocalizedStrings.FunctionService_FUNCTION_ATTRIBUTE_MISMATCH_CLIENT_SERVER
.toLocalizedString(function);
logger.warn("{}: {}", servConn.getName(), message);
- sendError(hasResult, msg, message, servConn);
+ sendError(hasResult, clientMessage, message, servConn);
return;
}
}
@@ -222,7 +223,7 @@ public class ExecuteRegionFunction66 extends BaseCommand {
// Construct execution
AbstractExecution execution = (AbstractExecution) FunctionService.onRegion(region);
ChunkedMessage m = servConn.getFunctionResponseMessage();
- m.setTransactionId(msg.getTransactionId());
+ m.setTransactionId(clientMessage.getTransactionId());
resultSender = new ServerToClientFunctionResultSender65(m,
MessageType.EXECUTE_REGION_FUNCTION_RESULT, servConn, functionObject, executeContext);
@@ -276,7 +277,7 @@ public class ExecuteRegionFunction66 extends BaseCommand {
} else {
execution.execute(functionObject);
}
- writeReply(msg, servConn);
+ writeReply(clientMessage, servConn);
}
} catch (IOException ioe) {
logger.warn(LocalizedMessage.create(
@@ -284,7 +285,7 @@ public class ExecuteRegionFunction66 extends BaseCommand {
function), ioe);
final String message = LocalizedStrings.ExecuteRegionFunction_SERVER_COULD_NOT_SEND_THE_REPLY
.toLocalizedString();
- sendException(hasResult, msg, message, servConn, ioe);
+ sendException(hasResult, clientMessage, message, servConn, ioe);
} catch (FunctionException fe) {
String message = fe.getMessage();
Object cause = fe.getCause();
@@ -321,7 +322,7 @@ public class ExecuteRegionFunction66 extends BaseCommand {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
function), fe);
- sendException(hasResult, msg, message, servConn, fe);
+ sendException(hasResult, clientMessage, message, servConn, fe);
}
} catch (Exception e) {
@@ -329,7 +330,7 @@ public class ExecuteRegionFunction66 extends BaseCommand {
LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
function), e);
String message = e.getMessage();
- sendException(hasResult, msg, message, servConn, e);
+ sendException(hasResult, clientMessage, message, servConn, e);
} finally {
handShake.setClientReadTimeout(earlierClientReadTimeout);
ServerConnection.executeFunctionOnLocalNodeOnly((byte) 0);
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunctionSingleHop.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunctionSingleHop.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunctionSingleHop.java
index 8b2cf75..cf96137 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunctionSingleHop.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunctionSingleHop.java
@@ -62,7 +62,8 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand {
private ExecuteRegionFunctionSingleHop() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
+ public void cmdExecute(Message clientMessage, ServerConnection servConn, long start)
+ throws IOException {
String regionName = null;
Object function = null;
@@ -79,7 +80,7 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand {
CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
int functionTimeout = ConnectionImpl.DEFAULT_CLIENT_FUNCTION_TIMEOUT;
try {
- byte[] bytes = msg.getPart(0).getSerializedForm();
+ byte[] bytes = clientMessage.getPart(0).getSerializedForm();
functionState = bytes[0];
if (bytes.length >= 5
&& servConn.getClientVersion().ordinal() >= Version.GFE_8009.ordinal()) {
@@ -94,49 +95,49 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand {
servConn.setAsTrue(REQUIRES_RESPONSE);
servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
}
- regionName = msg.getPart(1).getString();
- function = msg.getPart(2).getStringOrObject();
- args = msg.getPart(3).getObject();
- Part part = msg.getPart(4);
+ regionName = clientMessage.getPart(1).getString();
+ function = clientMessage.getPart(2).getStringOrObject();
+ args = clientMessage.getPart(3).getObject();
+ Part part = clientMessage.getPart(4);
if (part != null) {
Object obj = part.getObject();
if (obj instanceof MemberMappedArgument) {
memberMappedArg = (MemberMappedArgument) obj;
}
}
- isExecuteOnAllBuckets = msg.getPart(5).getSerializedForm()[0];
+ isExecuteOnAllBuckets = clientMessage.getPart(5).getSerializedForm()[0];
if (isExecuteOnAllBuckets == 1) {
filter = new HashSet();
- bucketIdsSize = msg.getPart(6).getInt();
+ bucketIdsSize = clientMessage.getPart(6).getInt();
if (bucketIdsSize != 0) {
buckets = new HashSet<Integer>();
partNumber = 7;
for (int i = 0; i < bucketIdsSize; i++) {
- buckets.add(msg.getPart(partNumber + i).getInt());
+ buckets.add(clientMessage.getPart(partNumber + i).getInt());
}
}
partNumber = 7 + bucketIdsSize;
} else {
- filterSize = msg.getPart(6).getInt();
+ filterSize = clientMessage.getPart(6).getInt();
if (filterSize != 0) {
filter = new HashSet<Object>();
partNumber = 7;
for (int i = 0; i < filterSize; i++) {
- filter.add(msg.getPart(partNumber + i).getStringOrObject());
+ filter.add(clientMessage.getPart(partNumber + i).getStringOrObject());
}
}
partNumber = 7 + filterSize;
}
- removedNodesSize = msg.getPart(partNumber).getInt();
+ removedNodesSize = clientMessage.getPart(partNumber).getInt();
if (removedNodesSize != 0) {
removedNodesSet = new HashSet<Object>();
partNumber = partNumber + 1;
for (int i = 0; i < removedNodesSize; i++) {
- removedNodesSet.add(msg.getPart(partNumber + i).getStringOrObject());
+ removedNodesSet.add(clientMessage.getPart(partNumber + i).getStringOrObject());
}
}
@@ -145,7 +146,7 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand {
LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
function), exception);
if (hasResult == 1) {
- writeChunkedException(msg, exception, false, servConn);
+ writeChunkedException(clientMessage, exception, servConn);
servConn.setAsTrue(RESPONDED);
return;
}
@@ -163,7 +164,7 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand {
.toLocalizedString("region");
}
logger.warn("{}: {}", servConn.getName(), message);
- sendError(hasResult, msg, message, servConn);
+ sendError(hasResult, clientMessage, message, servConn);
return;
}
@@ -173,7 +174,7 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand {
LocalizedStrings.ExecuteRegionFunction_THE_REGION_NAMED_0_WAS_NOT_FOUND_DURING_EXECUTE_FUNCTION_REQUEST
.toLocalizedString(regionName);
logger.warn("{}: {}", servConn.getName(), message);
- sendError(hasResult, msg, message, servConn);
+ sendError(hasResult, clientMessage, message, servConn);
return;
}
HandShake handShake = (HandShake) servConn.getHandshake();
@@ -189,7 +190,7 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand {
LocalizedStrings.ExecuteRegionFunction_THE_FUNCTION_0_HAS_NOT_BEEN_REGISTERED
.toLocalizedString(function);
logger.warn("{}: {}", servConn.getName(), message);
- sendError(hasResult, msg, message, servConn);
+ sendError(hasResult, clientMessage, message, servConn);
return;
} else {
byte functionStateOnServer = AbstractExecution.getFunctionState(functionObject.isHA(),
@@ -199,7 +200,7 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand {
LocalizedStrings.FunctionService_FUNCTION_ATTRIBUTE_MISMATCH_CLIENT_SERVER
.toLocalizedString(function);
logger.warn("{}: {}", servConn.getName(), message);
- sendError(hasResult, msg, message, servConn);
+ sendError(hasResult, clientMessage, message, servConn);
return;
}
}
@@ -222,7 +223,7 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand {
// Construct execution
AbstractExecution execution = (AbstractExecution) FunctionService.onRegion(region);
ChunkedMessage m = servConn.getFunctionResponseMessage();
- m.setTransactionId(msg.getTransactionId());
+ m.setTransactionId(clientMessage.getTransactionId());
resultSender = new ServerToClientFunctionResultSender65(m,
MessageType.EXECUTE_REGION_FUNCTION_RESULT, servConn, functionObject, executeContext);
@@ -290,7 +291,7 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand {
function), ioe);
final String message = LocalizedStrings.ExecuteRegionFunction_SERVER_COULD_NOT_SEND_THE_REPLY
.toLocalizedString();
- sendException(hasResult, msg, message, servConn, ioe);
+ sendException(hasResult, clientMessage, message, servConn, ioe);
} catch (FunctionException fe) {
String message = fe.getMessage();
@@ -301,21 +302,21 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand {
logger.debug("Exception on server while executing function: {}: {}", function, message,
fe);
}
- synchronized (msg) {
+ synchronized (clientMessage) {
resultSender.setException(fe);
}
} else {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
function), fe);
- sendException(hasResult, msg, message, servConn, fe);
+ sendException(hasResult, clientMessage, message, servConn, fe);
}
} catch (Exception e) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0,
function), e);
String message = e.getMessage();
- sendException(hasResult, msg, message, servConn, e);
+ sendException(hasResult, clientMessage, message, servConn, e);
} finally {
handShake.setClientReadTimeout(earlierClientReadTimeout);
ServerConnection.executeFunctionOnLocalNodeOnly((byte) 0);
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
index d44a4ad..d489b88 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
@@ -79,14 +79,14 @@ public class GatewayReceiverCommand extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, InterruptedException {
Part regionNamePart = null, keyPart = null, valuePart = null, callbackArgPart = null;
String regionName = null;
Object callbackArg = null, key = null;
int partNumber = 0;
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- GatewayReceiverStats stats = (GatewayReceiverStats) servConn.getCacheServerStats();
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ GatewayReceiverStats stats = (GatewayReceiverStats) serverConnection.getCacheServerStats();
EventID eventId = null;
LocalRegion region = null;
List<BatchException70> exceptions = new ArrayList<BatchException70>();
@@ -102,20 +102,20 @@ public class GatewayReceiverCommand extends BaseCommand {
// statement so that all messages can take advantage of it.
boolean earlyAck = false;// msg.getEarlyAck();
- stats.incBatchSize(msg.getPayloadLength());
+ stats.incBatchSize(clientMessage.getPayloadLength());
// Retrieve the number of events
- Part numberOfEventsPart = msg.getPart(0);
+ Part numberOfEventsPart = clientMessage.getPart(0);
int numberOfEvents = numberOfEventsPart.getInt();
stats.incEventsReceived(numberOfEvents);
// Retrieve the batch id
- Part batchIdPart = msg.getPart(1);
+ Part batchIdPart = clientMessage.getPart(1);
int batchId = batchIdPart.getInt();
// If this batch has already been seen, do not reply.
// Instead, drop the batch and continue.
- if (batchId <= servConn.getLatestBatchIdReplied()) {
+ if (batchId <= serverConnection.getLatestBatchIdReplied()) {
if (GatewayReceiver.APPLY_RETRIES) {
// Do nothing!!!
logger.warn(LocalizedMessage.create(
@@ -125,17 +125,17 @@ public class GatewayReceiverCommand extends BaseCommand {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ProcessBatch_RECEIVED_PROCESS_BATCH_REQUEST_0_THAT_HAS_ALREADY_BEEN_OR_IS_BEING_PROCESSED__THIS_PROCESS_BATCH_REQUEST_IS_BEING_IGNORED,
batchId));
- writeReply(msg, servConn, batchId, numberOfEvents);
+ writeReply(clientMessage, serverConnection, batchId, numberOfEvents);
return;
}
stats.incDuplicateBatchesReceived();
}
// Verify the batches arrive in order
- if (batchId != servConn.getLatestBatchIdReplied() + 1) {
+ if (batchId != serverConnection.getLatestBatchIdReplied() + 1) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ProcessBatch_RECEIVED_PROCESS_BATCH_REQUEST_0_OUT_OF_ORDER_THE_ID_OF_THE_LAST_BATCH_PROCESSED_WAS_1_THIS_BATCH_REQUEST_WILL_BE_PROCESSED_BUT_SOME_MESSAGES_MAY_HAVE_BEEN_LOST,
- new Object[] {batchId, servConn.getLatestBatchIdReplied()}));
+ new Object[] {batchId, serverConnection.getLatestBatchIdReplied()}));
stats.incOutoforderBatchesReceived();
}
@@ -146,7 +146,7 @@ public class GatewayReceiverCommand extends BaseCommand {
// If early ack mode, acknowledge right away
// Not sure if earlyAck makes sense with sliding window
if (earlyAck) {
- servConn.incrementLatestBatchIdReplied(batchId);
+ serverConnection.incrementLatestBatchIdReplied(batchId);
// writeReply(msg, servConn);
// servConn.setAsTrue(RESPONDED);
@@ -162,13 +162,13 @@ public class GatewayReceiverCommand extends BaseCommand {
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Received process batch request {} containing {} events ({} bytes) with {} acknowledgement on {}",
- servConn.getName(), batchId, numberOfEvents, msg.getPayloadLength(),
- (earlyAck ? "early" : "normal"), servConn.getSocketString());
+ serverConnection.getName(), batchId, numberOfEvents, clientMessage.getPayloadLength(),
+ (earlyAck ? "early" : "normal"), serverConnection.getSocketString());
if (earlyAck) {
logger.debug(
"{}: Sent process batch early response for batch {} containing {} events ({} bytes) with {} acknowledgement on {}",
- servConn.getName(), batchId, numberOfEvents, msg.getPayloadLength(),
- (earlyAck ? "early" : "normal"), servConn.getSocketString());
+ serverConnection.getName(), batchId, numberOfEvents, clientMessage.getPayloadLength(),
+ (earlyAck ? "early" : "normal"), serverConnection.getSocketString());
}
}
// logger.warn("Received process batch request " + batchId + " containing
@@ -185,10 +185,10 @@ public class GatewayReceiverCommand extends BaseCommand {
// Retrieve the events from the message parts. The '2' below
// represents the number of events (part0) and the batchId (part1)
partNumber = 2;
- int dsid = msg.getPart(partNumber++).getInt();
+ int dsid = clientMessage.getPart(partNumber++).getInt();
boolean removeOnException =
- msg.getPart(partNumber++).getSerializedForm()[0] == 1 ? true : false;
+ clientMessage.getPart(partNumber++).getSerializedForm()[0] == 1 ? true : false;
// Keep track of whether a response has been written for
// exceptions
@@ -202,7 +202,7 @@ public class GatewayReceiverCommand extends BaseCommand {
indexWithoutPDXEvent++;
// System.out.println("Processing event " + i + " in batch " + batchId + "
// starting with part number " + partNumber);
- Part actionTypePart = msg.getPart(partNumber);
+ Part actionTypePart = clientMessage.getPart(partNumber);
int actionType = actionTypePart.getInt();
long versionTimeStamp = VersionTag.ILLEGAL_VERSION_TIMESTAMP;
@@ -211,14 +211,14 @@ public class GatewayReceiverCommand extends BaseCommand {
boolean callbackArgExists = false;
try {
- Part possibleDuplicatePart = msg.getPart(partNumber + 1);
+ Part possibleDuplicatePart = clientMessage.getPart(partNumber + 1);
byte[] possibleDuplicatePartBytes;
try {
possibleDuplicatePartBytes = (byte[]) possibleDuplicatePart.getObject();
} catch (Exception e) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_REQUEST_1_CONTAINING_2_EVENTS,
- new Object[] {servConn.getName(), Integer.valueOf(batchId),
+ new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
Integer.valueOf(numberOfEvents)}),
e);
throw e;
@@ -231,7 +231,7 @@ public class GatewayReceiverCommand extends BaseCommand {
callbackArg = null;
// Retrieve the region name from the message parts
- regionNamePart = msg.getPart(partNumber + 2);
+ regionNamePart = clientMessage.getPart(partNumber + 2);
regionName = regionNamePart.getString();
if (regionName.equals(PeerTypeRegistration.REGION_FULL_PATH)) {
indexWithoutPDXEvent--;
@@ -243,28 +243,28 @@ public class GatewayReceiverCommand extends BaseCommand {
// duplication of events, but it is unused now. In
// fact the event id is overridden by the FROM_GATEWAY
// token.
- Part eventIdPart = msg.getPart(partNumber + 3);
- eventIdPart.setVersion(servConn.getClientVersion());
+ Part eventIdPart = clientMessage.getPart(partNumber + 3);
+ eventIdPart.setVersion(serverConnection.getClientVersion());
// String eventId = eventIdPart.getString();
try {
eventId = (EventID) eventIdPart.getObject();
} catch (Exception e) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_REQUEST_1_CONTAINING_2_EVENTS,
- new Object[] {servConn.getName(), Integer.valueOf(batchId),
+ new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
Integer.valueOf(numberOfEvents)}),
e);
throw e;
}
// Retrieve the key from the message parts
- keyPart = msg.getPart(partNumber + 4);
+ keyPart = clientMessage.getPart(partNumber + 4);
try {
key = keyPart.getStringOrObject();
} catch (Exception e) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_REQUEST_1_CONTAINING_2_EVENTS,
- new Object[] {servConn.getName(), Integer.valueOf(batchId),
+ new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
Integer.valueOf(numberOfEvents)}),
e);
throw e;
@@ -281,7 +281,7 @@ public class GatewayReceiverCommand extends BaseCommand {
*/
// Retrieve the value from the message parts (do not deserialize it)
- valuePart = msg.getPart(partNumber + 5);
+ valuePart = clientMessage.getPart(partNumber + 5);
// try {
// logger.warn(getName() + ": Creating key " + key + " value " +
// valuePart.getObject());
@@ -289,18 +289,18 @@ public class GatewayReceiverCommand extends BaseCommand {
// Retrieve the callbackArg from the message parts if necessary
int index = partNumber + 6;
- callbackArgExistsPart = msg.getPart(index++); {
+ callbackArgExistsPart = clientMessage.getPart(index++); {
byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
callbackArgExists = partBytes[0] == 0x01;
}
if (callbackArgExists) {
- callbackArgPart = msg.getPart(index++);
+ callbackArgPart = clientMessage.getPart(index++);
try {
callbackArg = callbackArgPart.getObject();
} catch (Exception e) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_CREATE_REQUEST_1_FOR_2_EVENTS,
- new Object[] {servConn.getName(), Integer.valueOf(batchId),
+ new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
Integer.valueOf(numberOfEvents)}),
e);
throw e;
@@ -309,14 +309,15 @@ public class GatewayReceiverCommand extends BaseCommand {
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Processing batch create request {} on {} for region {} key {} value {} callbackArg {}, eventId={}",
- servConn.getName(), batchId, servConn.getSocketString(), regionName, key,
- valuePart, callbackArg, eventId);
+ serverConnection.getName(), batchId, serverConnection.getSocketString(),
+ regionName, key, valuePart, callbackArg, eventId);
}
- versionTimeStamp = msg.getPart(index++).getLong();
+ versionTimeStamp = clientMessage.getPart(index++).getLong();
// Process the create request
if (key == null || regionName == null) {
StringId message = null;
- Object[] messageArgs = new Object[] {servConn.getName(), Integer.valueOf(batchId)};
+ Object[] messageArgs =
+ new Object[] {serverConnection.getName(), Integer.valueOf(batchId)};
if (key == null) {
message =
LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_CREATE_REQUEST_1_IS_NULL;
@@ -331,7 +332,7 @@ public class GatewayReceiverCommand extends BaseCommand {
}
region = (LocalRegion) crHelper.getRegion(regionName);
if (region == null) {
- handleRegionNull(servConn, regionName, batchId);
+ handleRegionNull(serverConnection, regionName, batchId);
} else {
clientEvent = new EventIDHolder(eventId);
if (versionTimeStamp > 0) {
@@ -348,7 +349,7 @@ public class GatewayReceiverCommand extends BaseCommand {
boolean isObject = valuePart.isObject();
// [sumedh] This should be done on client while sending
// since that is the WAN gateway
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
PutOperationContext putContext =
authzRequest.putAuthorize(regionName, key, value, isObject, callbackArg);
@@ -361,29 +362,29 @@ public class GatewayReceiverCommand extends BaseCommand {
result = addPdxType(crHelper, key, value);
} else {
result = region.basicBridgeCreate(key, value, isObject, callbackArg,
- servConn.getProxyID(), false, clientEvent, false);
+ serverConnection.getProxyID(), false, clientEvent, false);
// If the create fails (presumably because it already exists),
// attempt to update the entry
if (!result) {
result = region.basicBridgePut(key, value, null, isObject, callbackArg,
- servConn.getProxyID(), false, clientEvent);
+ serverConnection.getProxyID(), false, clientEvent);
}
}
if (result || clientEvent.isConcurrencyConflict()) {
- servConn.setModificationInfo(true, regionName, key);
+ serverConnection.setModificationInfo(true, regionName, key);
stats.incCreateRequest();
} else {
// This exception will be logged in the catch block below
throw new Exception(
LocalizedStrings.ProcessBatch_0_FAILED_TO_CREATE_OR_UPDATE_ENTRY_FOR_REGION_1_KEY_2_VALUE_3_CALLBACKARG_4
- .toLocalizedString(new Object[] {servConn.getName(), regionName, key,
- valuePart, callbackArg}));
+ .toLocalizedString(new Object[] {serverConnection.getName(), regionName,
+ key, valuePart, callbackArg}));
}
} catch (Exception e) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_CREATE_REQUEST_1_FOR_2_EVENTS,
- new Object[] {servConn.getName(), Integer.valueOf(batchId),
+ new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
Integer.valueOf(numberOfEvents)}),
e);
throw e;
@@ -400,7 +401,7 @@ public class GatewayReceiverCommand extends BaseCommand {
*/
// Retrieve the value from the message parts (do not deserialize it)
- valuePart = msg.getPart(partNumber + 5);
+ valuePart = clientMessage.getPart(partNumber + 5);
// try {
// logger.warn(getName() + ": Updating key " + key + " value " +
// valuePart.getObject());
@@ -408,34 +409,35 @@ public class GatewayReceiverCommand extends BaseCommand {
// Retrieve the callbackArg from the message parts if necessary
index = partNumber + 6;
- callbackArgExistsPart = msg.getPart(index++); {
+ callbackArgExistsPart = clientMessage.getPart(index++); {
byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
callbackArgExists = partBytes[0] == 0x01;
}
if (callbackArgExists) {
- callbackArgPart = msg.getPart(index++);
+ callbackArgPart = clientMessage.getPart(index++);
try {
callbackArg = callbackArgPart.getObject();
} catch (Exception e) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_REQUEST_1_CONTAINING_2_EVENTS,
- new Object[] {servConn.getName(), Integer.valueOf(batchId),
+ new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
Integer.valueOf(numberOfEvents)}),
e);
throw e;
}
}
- versionTimeStamp = msg.getPart(index++).getLong();
+ versionTimeStamp = clientMessage.getPart(index++).getLong();
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Processing batch update request {} on {} for region {} key {} value {} callbackArg {}",
- servConn.getName(), batchId, servConn.getSocketString(), regionName, key,
- valuePart, callbackArg);
+ serverConnection.getName(), batchId, serverConnection.getSocketString(),
+ regionName, key, valuePart, callbackArg);
}
// Process the update request
if (key == null || regionName == null) {
StringId message = null;
- Object[] messageArgs = new Object[] {servConn.getName(), Integer.valueOf(batchId)};
+ Object[] messageArgs =
+ new Object[] {serverConnection.getName(), Integer.valueOf(batchId)};
if (key == null) {
message =
LocalizedStrings.ProcessBatch_0_THE_INPUT_KEY_FOR_THE_BATCH_UPDATE_REQUEST_1_IS_NULL;
@@ -450,7 +452,7 @@ public class GatewayReceiverCommand extends BaseCommand {
}
region = (LocalRegion) crHelper.getRegion(regionName);
if (region == null) {
- handleRegionNull(servConn, regionName, batchId);
+ handleRegionNull(serverConnection, regionName, batchId);
} else {
clientEvent = new EventIDHolder(eventId);
if (versionTimeStamp > 0) {
@@ -465,7 +467,7 @@ public class GatewayReceiverCommand extends BaseCommand {
try {
byte[] value = valuePart.getSerializedForm();
boolean isObject = valuePart.isObject();
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
PutOperationContext putContext = authzRequest.putAuthorize(regionName, key, value,
isObject, callbackArg, PutOperationContext.UPDATE);
@@ -477,14 +479,14 @@ public class GatewayReceiverCommand extends BaseCommand {
result = addPdxType(crHelper, key, value);
} else {
result = region.basicBridgePut(key, value, null, isObject, callbackArg,
- servConn.getProxyID(), false, clientEvent);
+ serverConnection.getProxyID(), false, clientEvent);
}
if (result || clientEvent.isConcurrencyConflict()) {
- servConn.setModificationInfo(true, regionName, key);
+ serverConnection.setModificationInfo(true, regionName, key);
stats.incUpdateRequest();
} else {
- final Object[] msgArgs =
- new Object[] {servConn.getName(), regionName, key, valuePart, callbackArg};
+ final Object[] msgArgs = new Object[] {serverConnection.getName(), regionName,
+ key, valuePart, callbackArg};
final StringId message =
LocalizedStrings.ProcessBatch_0_FAILED_TO_UPDATE_ENTRY_FOR_REGION_1_KEY_2_VALUE_3_AND_CALLBACKARG_4;
String s = message.toLocalizedString(msgArgs);
@@ -493,16 +495,17 @@ public class GatewayReceiverCommand extends BaseCommand {
}
} catch (CancelException e) {
// FIXME better exception hierarchy would avoid this check
- if (servConn.getCachedRegionHelper().getCache().getCancelCriterion()
+ if (serverConnection.getCachedRegionHelper().getCache().getCancelCriterion()
.isCancelInProgress()) {
if (logger.isDebugEnabled()) {
logger.debug(
"{} ignoring message of type {} from client {} because shutdown occurred during message processing.",
- servConn.getName(), MessageType.getString(msg.getMessageType()),
- servConn.getProxyID());
+ serverConnection.getName(),
+ MessageType.getString(clientMessage.getMessageType()),
+ serverConnection.getProxyID());
}
- servConn.setFlagProcessMessagesAsFalse();
- servConn.setClientDisconnectedException(e);
+ serverConnection.setFlagProcessMessagesAsFalse();
+ serverConnection.setClientDisconnectedException(e);
} else {
throw e;
}
@@ -511,7 +514,7 @@ public class GatewayReceiverCommand extends BaseCommand {
// Preserve the connection under all circumstances
logger.warn(LocalizedMessage.create(
LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_REQUEST_1_CONTAINING_2_EVENTS,
- new Object[] {servConn.getName(), Integer.valueOf(batchId),
+ new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
Integer.valueOf(numberOfEvents)}),
e);
throw e;
@@ -521,28 +524,29 @@ public class GatewayReceiverCommand extends BaseCommand {
case 2: // Destroy
// Retrieve the callbackArg from the message parts if necessary
index = partNumber + 5;
- callbackArgExistsPart = msg.getPart(index++); {
+ callbackArgExistsPart = clientMessage.getPart(index++); {
byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
callbackArgExists = partBytes[0] == 0x01;
}
if (callbackArgExists) {
- callbackArgPart = msg.getPart(index++);
+ callbackArgPart = clientMessage.getPart(index++);
try {
callbackArg = callbackArgPart.getObject();
} catch (Exception e) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_DESTROY_REQUEST_1_CONTAINING_2_EVENTS,
- new Object[] {servConn.getName(), Integer.valueOf(batchId),
+ new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
Integer.valueOf(numberOfEvents)}),
e);
throw e;
}
}
- versionTimeStamp = msg.getPart(index++).getLong();
+ versionTimeStamp = clientMessage.getPart(index++).getLong();
if (logger.isDebugEnabled()) {
logger.debug("{}: Processing batch destroy request {} on {} for region {} key {}",
- servConn.getName(), batchId, servConn.getSocketString(), regionName, key);
+ serverConnection.getName(), batchId, serverConnection.getSocketString(),
+ regionName, key);
}
// Process the destroy request
@@ -556,14 +560,15 @@ public class GatewayReceiverCommand extends BaseCommand {
message =
LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_DESTROY_REQUEST_1_IS_NULL;
}
- Object[] messageArgs = new Object[] {servConn.getName(), Integer.valueOf(batchId)};
+ Object[] messageArgs =
+ new Object[] {serverConnection.getName(), Integer.valueOf(batchId)};
String s = message.toLocalizedString(messageArgs);
logger.warn(s);
throw new Exception(s);
}
region = (LocalRegion) crHelper.getRegion(regionName);
if (region == null) {
- handleRegionNull(servConn, regionName, batchId);
+ handleRegionNull(serverConnection, regionName, batchId);
} else {
clientEvent = new EventIDHolder(eventId);
if (versionTimeStamp > 0) {
@@ -576,20 +581,20 @@ public class GatewayReceiverCommand extends BaseCommand {
handleMessageRetry(region, clientEvent);
// Destroy the entry
try {
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
DestroyOperationContext destroyContext =
authzRequest.destroyAuthorize(regionName, key, callbackArg);
callbackArg = destroyContext.getCallbackArg();
}
- region.basicBridgeDestroy(key, callbackArg, servConn.getProxyID(), false,
+ region.basicBridgeDestroy(key, callbackArg, serverConnection.getProxyID(), false,
clientEvent);
- servConn.setModificationInfo(true, regionName, key);
+ serverConnection.setModificationInfo(true, regionName, key);
stats.incDestroyRequest();
} catch (EntryNotFoundException e) {
logger.info(LocalizedMessage.create(
LocalizedStrings.ProcessBatch_0_DURING_BATCH_DESTROY_NO_ENTRY_WAS_FOUND_FOR_KEY_1,
- new Object[] {servConn.getName(), key}));
+ new Object[] {serverConnection.getName(), key}));
// throw new Exception(e);
}
}
@@ -598,52 +603,52 @@ public class GatewayReceiverCommand extends BaseCommand {
try {
// Region name
- regionNamePart = msg.getPart(partNumber + 2);
+ regionNamePart = clientMessage.getPart(partNumber + 2);
regionName = regionNamePart.getString();
// Retrieve the event id from the message parts
- eventIdPart = msg.getPart(partNumber + 3);
+ eventIdPart = clientMessage.getPart(partNumber + 3);
eventId = (EventID) eventIdPart.getObject();
// Retrieve the key from the message parts
- keyPart = msg.getPart(partNumber + 4);
+ keyPart = clientMessage.getPart(partNumber + 4);
key = keyPart.getStringOrObject();
// Retrieve the callbackArg from the message parts if necessary
index = partNumber + 5;
- callbackArgExistsPart = msg.getPart(index++);
+ callbackArgExistsPart = clientMessage.getPart(index++);
byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
callbackArgExists = partBytes[0] == 0x01;
if (callbackArgExists) {
- callbackArgPart = msg.getPart(index++);
+ callbackArgPart = clientMessage.getPart(index++);
callbackArg = callbackArgPart.getObject();
}
} catch (Exception e) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_VERSION_REQUEST_1_CONTAINING_2_EVENTS,
- new Object[] {servConn.getName(), Integer.valueOf(batchId),
+ new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
Integer.valueOf(numberOfEvents)}),
e);
throw e;
}
- versionTimeStamp = msg.getPart(index++).getLong();
+ versionTimeStamp = clientMessage.getPart(index++).getLong();
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Processing batch update-version request {} on {} for region {} key {} value {} callbackArg {}",
- servConn.getName(), batchId, servConn.getSocketString(), regionName, key,
- valuePart, callbackArg);
+ serverConnection.getName(), batchId, serverConnection.getSocketString(),
+ regionName, key, valuePart, callbackArg);
}
// Process the update time-stamp request
if (key == null || regionName == null) {
StringId message =
LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_VERSION_REQUEST_1_CONTAINING_2_EVENTS;
- Object[] messageArgs = new Object[] {servConn.getName(), Integer.valueOf(batchId),
- Integer.valueOf(numberOfEvents)};
+ Object[] messageArgs = new Object[] {serverConnection.getName(),
+ Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)};
String s = message.toLocalizedString(messageArgs);
logger.warn(s);
throw new Exception(s);
@@ -652,7 +657,7 @@ public class GatewayReceiverCommand extends BaseCommand {
region = (LocalRegion) crHelper.getRegion(regionName);
if (region == null) {
- handleRegionNull(servConn, regionName, batchId);
+ handleRegionNull(serverConnection, regionName, batchId);
} else {
clientEvent = new EventIDHolder(eventId);
@@ -668,13 +673,13 @@ public class GatewayReceiverCommand extends BaseCommand {
// Update the version tag
try {
- region.basicBridgeUpdateVersionStamp(key, callbackArg, servConn.getProxyID(),
- false, clientEvent);
+ region.basicBridgeUpdateVersionStamp(key, callbackArg,
+ serverConnection.getProxyID(), false, clientEvent);
} catch (EntryNotFoundException e) {
logger.info(LocalizedMessage.create(
LocalizedStrings.ProcessBatch_0_DURING_BATCH_UPDATE_VERSION_NO_ENTRY_WAS_FOUND_FOR_KEY_1,
- new Object[] {servConn.getName(), key}));
+ new Object[] {serverConnection.getName(), key}));
// throw new Exception(e);
}
}
@@ -684,29 +689,29 @@ public class GatewayReceiverCommand extends BaseCommand {
default:
logger.fatal(LocalizedMessage.create(
LocalizedStrings.Processbatch_0_UNKNOWN_ACTION_TYPE_1_FOR_BATCH_FROM_2,
- new Object[] {servConn.getName(), Integer.valueOf(actionType),
- servConn.getSocketString()}));
+ new Object[] {serverConnection.getName(), Integer.valueOf(actionType),
+ serverConnection.getSocketString()}));
stats.incUnknowsOperationsReceived();
}
} catch (CancelException e) {
if (logger.isDebugEnabled()) {
logger.debug(
"{} ignoring message of type {} from client {} because shutdown occurred during message processing.",
- servConn.getName(), MessageType.getString(msg.getMessageType()),
- servConn.getProxyID());
+ serverConnection.getName(), MessageType.getString(clientMessage.getMessageType()),
+ serverConnection.getProxyID());
}
- servConn.setFlagProcessMessagesAsFalse();
- servConn.setClientDisconnectedException(e);
+ serverConnection.setFlagProcessMessagesAsFalse();
+ serverConnection.setClientDisconnectedException(e);
return;
} catch (Exception e) {
// If an interrupted exception is thrown , rethrow it
- checkForInterrupt(servConn, e);
+ checkForInterrupt(serverConnection, e);
// If we have an issue with the PDX registry, stop processing more data
if (e.getCause() instanceof PdxRegistryMismatchException) {
fatalException = e.getCause();
logger.fatal(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_PDX_CONFIGURATION,
- new Object[] {servConn.getMembershipID()}), e.getCause());
+ new Object[] {serverConnection.getMembershipID()}), e.getCause());
break;
}
@@ -772,26 +777,26 @@ public class GatewayReceiverCommand extends BaseCommand {
stats.incProcessBatchTime(start - oldStart);
}
if (fatalException != null) {
- servConn.incrementLatestBatchIdReplied(batchId);
- writeFatalException(msg, fatalException, servConn, batchId);
- servConn.setAsTrue(RESPONDED);
+ serverConnection.incrementLatestBatchIdReplied(batchId);
+ writeFatalException(clientMessage, fatalException, serverConnection, batchId);
+ serverConnection.setAsTrue(RESPONDED);
} else if (!exceptions.isEmpty()) {
- servConn.incrementLatestBatchIdReplied(batchId);
- writeBatchException(msg, exceptions, servConn, batchId);
- servConn.setAsTrue(RESPONDED);
+ serverConnection.incrementLatestBatchIdReplied(batchId);
+ writeBatchException(clientMessage, exceptions, serverConnection, batchId);
+ serverConnection.setAsTrue(RESPONDED);
} else if (!wroteResponse) {
// Increment the batch id unless the received batch id is -1 (a failover
// batch)
- servConn.incrementLatestBatchIdReplied(batchId);
+ serverConnection.incrementLatestBatchIdReplied(batchId);
- writeReply(msg, servConn, batchId, numberOfEvents);
- servConn.setAsTrue(RESPONDED);
+ writeReply(clientMessage, serverConnection, batchId, numberOfEvents);
+ serverConnection.setAsTrue(RESPONDED);
stats.incWriteProcessBatchResponseTime(DistributionStats.getStatTime() - start);
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Sent process batch normal response for batch {} containing {} events ({} bytes) with {} acknowledgement on {}",
- servConn.getName(), batchId, numberOfEvents, msg.getPayloadLength(),
- (earlyAck ? "early" : "normal"), servConn.getSocketString());
+ serverConnection.getName(), batchId, numberOfEvents, clientMessage.getPayloadLength(),
+ (earlyAck ? "early" : "normal"), serverConnection.getSocketString());
}
// logger.warn("Sent process batch normal response for batch " +
// batchId + " containing " + numberOfEvents + " events (" +
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java
index 5cb1e41..2ca8804 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java
@@ -54,17 +54,17 @@ public class Get70 extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long startparam)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long startparam)
throws IOException {
long start = startparam;
Part regionNamePart = null, keyPart = null, valuePart = null;
String regionName = null;
Object callbackArg = null, key = null;
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- CacheServerStats stats = servConn.getCacheServerStats();
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
StringId errMessage = null;
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
// requiresResponse = true;
{
long oldStart = start;
@@ -72,18 +72,18 @@ public class Get70 extends BaseCommand {
stats.incReadGetRequestTime(start - oldStart);
}
// Retrieve the data from the message parts
- int parts = msg.getNumberOfParts();
- regionNamePart = msg.getPart(0);
- keyPart = msg.getPart(1);
+ int parts = clientMessage.getNumberOfParts();
+ regionNamePart = clientMessage.getPart(0);
+ keyPart = clientMessage.getPart(1);
// valuePart = null; (redundant assignment)
if (parts > 2) {
- valuePart = msg.getPart(2);
+ valuePart = clientMessage.getPart(2);
try {
callbackArg = valuePart.getObject();
} catch (Exception e) {
- writeException(msg, e, false, servConn);
+ writeException(clientMessage, e, false, serverConnection);
// responded = true;
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
}
@@ -91,15 +91,15 @@ public class Get70 extends BaseCommand {
try {
key = keyPart.getStringOrObject();
} catch (Exception e) {
- writeException(msg, e, false, servConn);
+ writeException(clientMessage, e, false, serverConnection);
// responded = true;
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
if (logger.isDebugEnabled()) {
logger.debug("{}: Received 7.0 get request ({} bytes) from {} for region {} key {} txId {}",
- servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key,
- msg.getTransactionId());
+ serverConnection.getName(), clientMessage.getPayloadLength(),
+ serverConnection.getSocketString(), regionName, key, clientMessage.getTransactionId());
}
// Process the get request
@@ -113,18 +113,18 @@ public class Get70 extends BaseCommand {
errMessage = LocalizedStrings.Request_THE_INPUT_REGION_NAME_FOR_THE_GET_REQUEST_IS_NULL;
}
String s = errMessage.toLocalizedString();
- logger.warn("{}: {}", servConn.getName(), s);
- writeErrorResponse(msg, MessageType.REQUESTDATAERROR, s, servConn);
- servConn.setAsTrue(RESPONDED);
+ logger.warn("{}: {}", serverConnection.getName(), s);
+ writeErrorResponse(clientMessage, MessageType.REQUESTDATAERROR, s, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- Region region = servConn.getCache().getRegion(regionName);
+ Region region = serverConnection.getCache().getRegion(regionName);
if (region == null) {
String reason = LocalizedStrings.Request__0_WAS_NOT_FOUND_DURING_GET_REQUEST
.toLocalizedString(regionName);
- writeRegionDestroyedEx(msg, regionName, reason, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -133,14 +133,14 @@ public class Get70 extends BaseCommand {
// for integrated security
this.securityService.authorizeRegionRead(regionName, key.toString());
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
getContext = authzRequest.getAuthorize(regionName, key, callbackArg);
callbackArg = getContext.getCallbackArg();
}
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -148,10 +148,10 @@ public class Get70 extends BaseCommand {
// the value if it is a byte[].
Entry entry;
try {
- entry = getEntry(region, key, callbackArg, servConn);
+ entry = getEntry(region, key, callbackArg, serverConnection);
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -164,7 +164,7 @@ public class Get70 extends BaseCommand {
boolean keyNotPresent = entry.keyNotPresent;
try {
- AuthorizeRequestPP postAuthzRequest = servConn.getPostAuthzRequest();
+ AuthorizeRequestPP postAuthzRequest = serverConnection.getPostAuthzRequest();
if (postAuthzRequest != null) {
try {
getContext = postAuthzRequest.getAuthorize(regionName, key, data, isObject, getContext);
@@ -182,8 +182,8 @@ public class Get70 extends BaseCommand {
}
}
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -197,23 +197,25 @@ public class Get70 extends BaseCommand {
if (region instanceof PartitionedRegion) {
PartitionedRegion pr = (PartitionedRegion) region;
if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) {
- writeResponseWithRefreshMetadata(data, callbackArg, msg, isObject, servConn, pr,
- pr.getNetworkHopType(), versionTag, keyNotPresent);
+ writeResponseWithRefreshMetadata(data, callbackArg, clientMessage, isObject,
+ serverConnection, pr, pr.getNetworkHopType(), versionTag, keyNotPresent);
pr.clearNetworkHopData();
} else {
- writeResponse(data, callbackArg, msg, isObject, versionTag, keyNotPresent, servConn);
+ writeResponse(data, callbackArg, clientMessage, isObject, versionTag, keyNotPresent,
+ serverConnection);
}
} else {
- writeResponse(data, callbackArg, msg, isObject, versionTag, keyNotPresent, servConn);
+ writeResponse(data, callbackArg, clientMessage, isObject, versionTag, keyNotPresent,
+ serverConnection);
}
} finally {
OffHeapHelper.release(originalData);
}
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
if (logger.isDebugEnabled()) {
- logger.debug("{}: Wrote get response back to {} for region {} {}", servConn.getName(),
- servConn.getSocketString(), regionName, entry);
+ logger.debug("{}: Wrote get response back to {} for region {} {}", serverConnection.getName(),
+ serverConnection.getSocketString(), regionName, entry);
}
stats.incWriteGetResponseTime(DistributionStats.getStatTime() - start);
@@ -379,12 +381,12 @@ public class Get70 extends BaseCommand {
}
@Override
- protected void writeReply(Message origMsg, ServerConnection servConn) throws IOException {
+ protected void writeReply(Message origMsg, ServerConnection serverConnection) throws IOException {
throw new UnsupportedOperationException();
}
@Override
- protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection servConn,
+ protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection serverConnection,
PartitionedRegion pr, byte nwHop) throws IOException {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll.java
index 22e63c6..01c5c9c 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll.java
@@ -44,33 +44,34 @@ public class GetAll extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, InterruptedException {
Part regionNamePart = null, keysPart = null;
String regionName = null;
Object[] keys = null;
- servConn.setAsTrue(REQUIRES_RESPONSE);
- servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
// Retrieve the region name from the message parts
- regionNamePart = msg.getPart(0);
+ regionNamePart = clientMessage.getPart(0);
regionName = regionNamePart.getString();
// Retrieve the keys array from the message parts
- keysPart = msg.getPart(1);
+ keysPart = clientMessage.getPart(1);
try {
keys = (Object[]) keysPart.getObject();
} catch (Exception e) {
- writeChunkedException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeChunkedException(clientMessage, e, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
if (logger.isDebugEnabled()) {
StringBuffer buffer = new StringBuffer();
- buffer.append(servConn.getName()).append(": Received getAll request (")
- .append(msg.getPayloadLength()).append(" bytes) from ").append(servConn.getSocketString())
- .append(" for region ").append(regionName).append(" keys ");
+ buffer.append(serverConnection.getName()).append(": Received getAll request (")
+ .append(clientMessage.getPayloadLength()).append(" bytes) from ")
+ .append(serverConnection.getSocketString()).append(" for region ").append(regionName)
+ .append(" keys ");
if (keys != null) {
for (int i = 0; i < keys.length; i++) {
buffer.append(keys[i]).append(" ");
@@ -91,37 +92,38 @@ public class GetAll extends BaseCommand {
message = LocalizedStrings.GetAll_THE_INPUT_REGION_NAME_FOR_THE_GETALL_REQUEST_IS_NULL
.toLocalizedString();
}
- logger.warn("{}: {}", servConn.getName(), message);
- writeChunkedErrorResponse(msg, MessageType.GET_ALL_DATA_ERROR, message, servConn);
- servConn.setAsTrue(RESPONDED);
+ logger.warn("{}: {}", serverConnection.getName(), message);
+ writeChunkedErrorResponse(clientMessage, MessageType.GET_ALL_DATA_ERROR, message,
+ serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName);
+ LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName);
if (region == null) {
String reason = " was not found during getAll request";
- writeRegionDestroyedEx(msg, regionName, reason, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
// Send header
- ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
+ ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage();
chunkedResponseMsg.setMessageType(MessageType.RESPONSE);
- chunkedResponseMsg.setTransactionId(msg.getTransactionId());
+ chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId());
chunkedResponseMsg.sendHeader();
// Send chunk response
try {
- fillAndSendGetAllResponseChunks(region, regionName, keys, servConn);
- servConn.setAsTrue(RESPONDED);
+ fillAndSendGetAllResponseChunks(region, regionName, keys, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
} catch (Exception e) {
// If an interrupted exception is thrown , rethrow it
- checkForInterrupt(servConn, e);
+ checkForInterrupt(serverConnection, e);
// Otherwise, write an exception message and continue
- writeChunkedException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeChunkedException(clientMessage, e, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
}
@@ -142,14 +144,14 @@ public class GetAll extends BaseCommand {
numKeys = allKeys.size();
}
- ObjectPartList values = new ObjectPartList(maximumChunkSize, keys == null);
+ ObjectPartList values = new ObjectPartList(MAXIMUM_CHUNK_SIZE, keys == null);
AuthorizeRequest authzRequest = servConn.getAuthzRequest();
AuthorizeRequestPP postAuthzRequest = servConn.getPostAuthzRequest();
Request request = (Request) Request.getCommand();
Object[] valueAndIsObject = new Object[3];
for (int i = 0; i < numKeys; i++) {
// Send the intermediate chunk if necessary
- if (values.size() == maximumChunkSize) {
+ if (values.size() == MAXIMUM_CHUNK_SIZE) {
// Send the chunk and clear the list
sendGetAllResponseChunk(region, values, false, servConn);
values.clear();
@@ -246,7 +248,7 @@ public class GetAll extends BaseCommand {
ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
chunkedResponseMsg.setNumberOfParts(1);
chunkedResponseMsg.setLastChunk(lastChunk);
- chunkedResponseMsg.addObjPart(list, zipValues);
+ chunkedResponseMsg.addObjPart(list, false);
if (logger.isDebugEnabled()) {
logger.debug("{}: Sending {} getAll response chunk for region={} values={} chunk=<{}>",
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll651.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll651.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll651.java
index a19d540..ad8ef49 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll651.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll651.java
@@ -21,7 +21,6 @@ import java.util.Set;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.operations.GetOperationContext;
import org.apache.geode.internal.cache.LocalRegion;
-import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.BaseCommand;
@@ -45,33 +44,34 @@ public class GetAll651 extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, InterruptedException {
Part regionNamePart = null, keysPart = null;
String regionName = null;
Object[] keys = null;
- servConn.setAsTrue(REQUIRES_RESPONSE);
- servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
// Retrieve the region name from the message parts
- regionNamePart = msg.getPart(0);
+ regionNamePart = clientMessage.getPart(0);
regionName = regionNamePart.getString();
// Retrieve the keys array from the message parts
- keysPart = msg.getPart(1);
+ keysPart = clientMessage.getPart(1);
try {
keys = (Object[]) keysPart.getObject();
} catch (Exception e) {
- writeChunkedException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeChunkedException(clientMessage, e, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
if (logger.isDebugEnabled()) {
StringBuffer buffer = new StringBuffer();
- buffer.append(servConn.getName()).append(": Received getAll request (")
- .append(msg.getPayloadLength()).append(" bytes) from ").append(servConn.getSocketString())
- .append(" for region ").append(regionName).append(" keys ");
+ buffer.append(serverConnection.getName()).append(": Received getAll request (")
+ .append(clientMessage.getPayloadLength()).append(" bytes) from ")
+ .append(serverConnection.getSocketString()).append(" for region ").append(regionName)
+ .append(" keys ");
if (keys != null) {
for (int i = 0; i < keys.length; i++) {
buffer.append(keys[i]).append(" ");
@@ -90,37 +90,38 @@ public class GetAll651 extends BaseCommand {
message = LocalizedStrings.GetAll_THE_INPUT_REGION_NAME_FOR_THE_GETALL_REQUEST_IS_NULL
.toLocalizedString();
}
- logger.warn("{}: {}", servConn.getName(), message);
- writeChunkedErrorResponse(msg, MessageType.GET_ALL_DATA_ERROR, message, servConn);
- servConn.setAsTrue(RESPONDED);
+ logger.warn("{}: {}", serverConnection.getName(), message);
+ writeChunkedErrorResponse(clientMessage, MessageType.GET_ALL_DATA_ERROR, message,
+ serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName);
+ LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName);
if (region == null) {
String reason = " was not found during getAll request";
- writeRegionDestroyedEx(msg, regionName, reason, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
// Send header
- ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
+ ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage();
chunkedResponseMsg.setMessageType(MessageType.RESPONSE);
- chunkedResponseMsg.setTransactionId(msg.getTransactionId());
+ chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId());
chunkedResponseMsg.sendHeader();
// Send chunk response
try {
- fillAndSendGetAllResponseChunks(region, regionName, keys, servConn);
- servConn.setAsTrue(RESPONDED);
+ fillAndSendGetAllResponseChunks(region, regionName, keys, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
} catch (Exception e) {
// If an interrupted exception is thrown , rethrow it
- checkForInterrupt(servConn, e);
+ checkForInterrupt(serverConnection, e);
// Otherwise, write an exception message and continue
- writeChunkedException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeChunkedException(clientMessage, e, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
}
@@ -148,7 +149,7 @@ public class GetAll651 extends BaseCommand {
final boolean isDebugEnabled = logger.isDebugEnabled();
for (int i = 0; i < numKeys; i++) {
// Send the intermediate chunk if necessary
- if (values.size() == maximumChunkSize) {
+ if (values.size() == MAXIMUM_CHUNK_SIZE) {
// Send the chunk and clear the list
sendGetAllResponseChunk(region, values, false, servConn);
values.clear();
@@ -253,7 +254,7 @@ public class GetAll651 extends BaseCommand {
* @param includeKeys if the part list should include the keys
*/
protected ObjectPartList651 getObjectPartsList(boolean includeKeys) {
- ObjectPartList651 values = new ObjectPartList651(maximumChunkSize, includeKeys);
+ ObjectPartList651 values = new ObjectPartList651(MAXIMUM_CHUNK_SIZE, includeKeys);
return values;
}
@@ -262,7 +263,7 @@ public class GetAll651 extends BaseCommand {
ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
chunkedResponseMsg.setNumberOfParts(1);
chunkedResponseMsg.setLastChunk(lastChunk);
- chunkedResponseMsg.addObjPart(list, zipValues);
+ chunkedResponseMsg.addObjPart(list, false);
if (logger.isDebugEnabled()) {
logger.debug("{}: Sending {} getAll response chunk for region={} values={} chunk=<{}>",
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll70.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll70.java
index 154e800..267a5b2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll70.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll70.java
@@ -23,7 +23,6 @@ import org.apache.geode.cache.operations.GetOperationContext;
import org.apache.geode.cache.operations.internal.GetOperationContextImpl;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.LocalRegion;
-import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.BaseCommand;
@@ -40,7 +39,6 @@ import org.apache.geode.internal.offheap.OffHeapHelper;
import org.apache.geode.internal.offheap.annotations.Retained;
import org.apache.geode.internal.security.AuthorizeRequest;
import org.apache.geode.internal.security.AuthorizeRequestPP;
-import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.security.NotAuthorizedException;
public class GetAll70 extends BaseCommand {
@@ -52,36 +50,37 @@ public class GetAll70 extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, InterruptedException {
Part regionNamePart = null, keysPart = null;
String regionName = null;
Object[] keys = null;
- servConn.setAsTrue(REQUIRES_RESPONSE);
- servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
int partIdx = 0;
// Retrieve the region name from the message parts
- regionNamePart = msg.getPart(partIdx++);
+ regionNamePart = clientMessage.getPart(partIdx++);
regionName = regionNamePart.getString();
// Retrieve the keys array from the message parts
- keysPart = msg.getPart(partIdx++);
+ keysPart = clientMessage.getPart(partIdx++);
try {
keys = (Object[]) keysPart.getObject();
} catch (Exception e) {
- writeChunkedException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeChunkedException(clientMessage, e, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
boolean requestSerializedValues;
- requestSerializedValues = msg.getPart(partIdx++).getInt() == 1;
+ requestSerializedValues = clientMessage.getPart(partIdx++).getInt() == 1;
if (logger.isDebugEnabled()) {
StringBuffer buffer = new StringBuffer();
- buffer.append(servConn.getName()).append(": Received getAll request (")
- .append(msg.getPayloadLength()).append(" bytes) from ").append(servConn.getSocketString())
- .append(" for region ").append(regionName).append(" keys ");
+ buffer.append(serverConnection.getName()).append(": Received getAll request (")
+ .append(clientMessage.getPayloadLength()).append(" bytes) from ")
+ .append(serverConnection.getSocketString()).append(" for region ").append(regionName)
+ .append(" keys ");
if (keys != null) {
for (int i = 0; i < keys.length; i++) {
buffer.append(keys[i]).append(" ");
@@ -100,37 +99,39 @@ public class GetAll70 extends BaseCommand {
message = LocalizedStrings.GetAll_THE_INPUT_REGION_NAME_FOR_THE_GETALL_REQUEST_IS_NULL
.toLocalizedString();
}
- logger.warn("{}: {}", servConn.getName(), message);
- writeChunkedErrorResponse(msg, MessageType.GET_ALL_DATA_ERROR, message, servConn);
- servConn.setAsTrue(RESPONDED);
+ logger.warn("{}: {}", serverConnection.getName(), message);
+ writeChunkedErrorResponse(clientMessage, MessageType.GET_ALL_DATA_ERROR, message,
+ serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName);
+ LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName);
if (region == null) {
String reason = " was not found during getAll request";
- writeRegionDestroyedEx(msg, regionName, reason, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
// Send header
- ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
+ ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage();
chunkedResponseMsg.setMessageType(MessageType.RESPONSE);
- chunkedResponseMsg.setTransactionId(msg.getTransactionId());
+ chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId());
chunkedResponseMsg.sendHeader();
// Send chunk response
try {
- fillAndSendGetAllResponseChunks(region, regionName, keys, servConn, requestSerializedValues);
- servConn.setAsTrue(RESPONDED);
+ fillAndSendGetAllResponseChunks(region, regionName, keys, serverConnection,
+ requestSerializedValues);
+ serverConnection.setAsTrue(RESPONDED);
} catch (Exception e) {
// If an interrupted exception is thrown , rethrow it
- checkForInterrupt(servConn, e);
+ checkForInterrupt(serverConnection, e);
// Otherwise, write an exception message and continue
- writeChunkedException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeChunkedException(clientMessage, e, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
}
@@ -163,7 +164,7 @@ public class GetAll70 extends BaseCommand {
// in the old mode (which may be impossible since we only used that mode pre 7.0) in which the
// client told us
// to get and return all the keys and values. I think this was used for register interest.
- VersionedObjectList values = new VersionedObjectList(maximumChunkSize, keys == null,
+ VersionedObjectList values = new VersionedObjectList(MAXIMUM_CHUNK_SIZE, keys == null,
region.getAttributes().getConcurrencyChecksEnabled(), requestSerializedValues);
try {
AuthorizeRequest authzRequest = servConn.getAuthzRequest();
@@ -172,7 +173,7 @@ public class GetAll70 extends BaseCommand {
final boolean isDebugEnabled = logger.isDebugEnabled();
for (int i = 0; i < numKeys; i++) {
// Send the intermediate chunk if necessary
- if (values.size() == maximumChunkSize) {
+ if (values.size() == MAXIMUM_CHUNK_SIZE) {
// Send the chunk and clear the list
values.setKeys(null);
sendGetAllResponseChunk(region, values, false, servConn);
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAllForRI.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAllForRI.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAllForRI.java
index d380beb..43d3348 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAllForRI.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAllForRI.java
@@ -38,7 +38,7 @@ public class GetAllForRI extends GetAll651 {
@Override
protected ObjectPartList651 getObjectPartsList(boolean includeKeys) {
- return new SerializedObjectPartList(maximumChunkSize, includeKeys);
+ return new SerializedObjectPartList(MAXIMUM_CHUNK_SIZE, includeKeys);
}