You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/19 22:00:56 UTC
[19/28] geode git commit: Cleanup BaseCommand
http://git-wip-us.apache.org/repos/asf/geode/blob/db81427f/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RollbackCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RollbackCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RollbackCommand.java
index a579170..cd12ea7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RollbackCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RollbackCommand.java
@@ -39,18 +39,18 @@ public class RollbackCommand extends BaseCommand {
private RollbackCommand() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, ClassNotFoundException, InterruptedException {
- servConn.setAsTrue(REQUIRES_RESPONSE);
- TXManagerImpl txMgr = (TXManagerImpl) servConn.getCache().getCacheTransactionManager();
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
+ TXManagerImpl txMgr = (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager();
InternalDistributedMember client =
- (InternalDistributedMember) servConn.getProxyID().getDistributedMember();
- int uniqId = msg.getTransactionId();
+ (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember();
+ int uniqId = clientMessage.getTransactionId();
TXId txId = new TXId(client, uniqId);
if (txMgr.isHostedTxRecentlyCompleted(txId)) {
if (logger.isDebugEnabled()) {
logger.debug("TX: found a recently rolled back tx: {}", txId);
- sendRollbackReply(msg, servConn);
+ sendRollbackReply(clientMessage, serverConnection);
txMgr.removeHostedTXState(txId);
return;
}
@@ -60,16 +60,16 @@ public class RollbackCommand extends BaseCommand {
if (txState != null) {
txId = txState.getTxId();
txMgr.rollback();
- sendRollbackReply(msg, servConn);
+ sendRollbackReply(clientMessage, serverConnection);
} else {
// could not find TxState in the host server.
// Protect against a failover command received so late,
// and it is removed from the failoverMap due to capacity.
- sendRollbackReply(msg, servConn);
+ sendRollbackReply(clientMessage, serverConnection);
}
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
} finally {
if (logger.isDebugEnabled()) {
logger.debug("TX: removing tx state for {}", txId);
http://git-wip-us.apache.org/repos/asf/geode/blob/db81427f/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Size.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Size.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Size.java
index c78f4d9..42e14a3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Size.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Size.java
@@ -56,18 +56,18 @@ public class Size extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, InterruptedException {
StringBuilder errMessage = new StringBuilder();
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- CacheServerStats stats = servConn.getCacheServerStats();
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
long oldStart = start;
start = DistributionStats.getStatTime();
stats.incReadSizeRequestTime(start - oldStart);
// Retrieve the data from the message parts
- Part regionNamePart = msg.getPart(0);
+ Part regionNamePart = clientMessage.getPart(0);
String regionName = regionNamePart.getString();
if (regionName == null) {
@@ -76,8 +76,8 @@ public class Size extends BaseCommand {
errMessage
.append(LocalizedStrings.BaseCommand__THE_INPUT_REGION_NAME_FOR_THE_0_REQUEST_IS_NULL
.toLocalizedString("size"));
- writeErrorResponse(msg, MessageType.SIZE_ERROR, errMessage.toString(), servConn);
- servConn.setAsTrue(RESPONDED);
+ writeErrorResponse(clientMessage, MessageType.SIZE_ERROR, errMessage.toString(), serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -85,38 +85,38 @@ public class Size extends BaseCommand {
if (region == null) {
String reason = LocalizedStrings.BaseCommand__0_WAS_NOT_FOUND_DURING_1_REQUEST
.toLocalizedString(regionName, "size");
- writeRegionDestroyedEx(msg, regionName, reason, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
// Size the entry
try {
this.securityService.authorizeRegionRead(regionName);
- writeSizeResponse(region.size(), msg, servConn);
+ writeSizeResponse(region.size(), clientMessage, serverConnection);
} catch (RegionDestroyedException rde) {
- writeException(msg, rde, false, servConn);
+ writeException(clientMessage, rde, false, serverConnection);
} catch (Exception e) {
// If an interrupted exception is thrown , rethrow it
- checkForInterrupt(servConn, e);
+ checkForInterrupt(serverConnection, e);
// If an exception occurs during the destroy, preserve the connection
- writeException(msg, e, false, servConn);
+ writeException(clientMessage, e, false, serverConnection);
if (e instanceof GemFireSecurityException) {
// Fine logging for security exceptions since these are already
// logged by the security logger
if (logger.isDebugEnabled()) {
- logger.debug("{}: Unexpected Security exception", servConn.getName(), e);
+ logger.debug("{}: Unexpected Security exception", serverConnection.getName(), e);
}
} else {
logger.warn(LocalizedMessage.create(LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION,
- servConn.getName()), e);
+ serverConnection.getName()), e);
}
} finally {
if (logger.isDebugEnabled()) {
- logger.debug("{}: Sent size response for region {}", servConn.getName(), regionName);
+ logger.debug("{}: Sent size response for region {}", serverConnection.getName(), regionName);
}
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
stats.incWriteSizeResponseTime(DistributionStats.getStatTime() - start);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/db81427f/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
index 72eab50..9fc3fd1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
@@ -49,23 +49,23 @@ public class TXFailoverCommand extends BaseCommand {
private TXFailoverCommand() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, ClassNotFoundException, InterruptedException {
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
// Build the TXId for the transaction
InternalDistributedMember client =
- (InternalDistributedMember) servConn.getProxyID().getDistributedMember();
- int uniqId = msg.getTransactionId();
+ (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember();
+ int uniqId = clientMessage.getTransactionId();
if (logger.isDebugEnabled()) {
logger.debug("TX: Transaction {} from {} is failing over to this server", uniqId, client);
}
TXId txId = new TXId(client, uniqId);
- TXManagerImpl mgr = (TXManagerImpl) servConn.getCache().getCacheTransactionManager();
+ TXManagerImpl mgr = (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager();
mgr.waitForCompletingTransaction(txId); // in case it's already completing here in another
// thread
if (mgr.isHostedTxRecentlyCompleted(txId)) {
- writeReply(msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeReply(clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
mgr.removeHostedTXState(txId);
return;
}
@@ -75,7 +75,7 @@ public class TXFailoverCommand extends BaseCommand {
if (!tx.isRealDealLocal()) {
// send message to all peers to find out who hosts the transaction
FindRemoteTXMessageReplyProcessor processor =
- FindRemoteTXMessage.send(servConn.getCache(), txId);
+ FindRemoteTXMessage.send(serverConnection.getCache(), txId);
try {
processor.waitForRepliesUninterruptibly();
} catch (ReplyException e) {
@@ -96,7 +96,7 @@ public class TXFailoverCommand extends BaseCommand {
// bug #42228 and bug #43504 - this cannot return until the current view
// has been installed by all members, so that dlocks are released and
// the same keys can be used in a new transaction by the same client thread
- InternalCache cache = servConn.getCache();
+ InternalCache cache = serverConnection.getCache();
try {
WaitForViewInstallation.send((DistributionManager) cache.getDistributionManager());
} catch (InterruptedException e) {
@@ -110,9 +110,9 @@ public class TXFailoverCommand extends BaseCommand {
}
mgr.saveTXCommitMessageForClientFailover(txId, processor.getTxCommitMessage());
} else {
- writeException(msg, new TransactionDataNodeHasDepartedException(
- "Could not find transaction host for " + txId), false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, new TransactionDataNodeHasDepartedException(
+ "Could not find transaction host for " + txId), false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
mgr.removeHostedTXState(txId);
return;
}
@@ -121,8 +121,8 @@ public class TXFailoverCommand extends BaseCommand {
if (!wasInProgress) {
mgr.setInProgress(false);
}
- writeReply(msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeReply(clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/db81427f/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
index 8cedd2c..c5b9fc5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
@@ -15,7 +15,6 @@
package org.apache.geode.internal.cache.tier.sockets.command;
-import org.apache.geode.cache.SynchronizationCommitConflictException;
import org.apache.geode.cache.client.internal.TXSynchronizationOp.CompletionType;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ReplyException;
@@ -54,7 +53,7 @@ public class TXSynchronizationCommand extends BaseCommand {
* org.apache.geode.internal.cache.tier.sockets.ServerConnection)
*/
@Override
- protected boolean shouldMasqueradeForTx(Message msg, ServerConnection servConn) {
+ protected boolean shouldMasqueradeForTx(Message clientMessage, ServerConnection serverConnection) {
// masquerading is done in the waiting thread pool
return false;
}
@@ -68,26 +67,26 @@ public class TXSynchronizationCommand extends BaseCommand {
* long)
*/
@Override
- public void cmdExecute(final Message msg, final ServerConnection servConn, long start)
+ public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection, long start)
throws IOException, ClassNotFoundException, InterruptedException {
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
- CompletionType type = CompletionType.values()[msg.getPart(0).getInt()];
- /* int txIdInt = */ msg.getPart(1).getInt(); // [bruce] not sure if we need to transmit this
+ CompletionType type = CompletionType.values()[clientMessage.getPart(0).getInt()];
+ /* int txIdInt = */ clientMessage.getPart(1).getInt(); // [bruce] not sure if we need to transmit this
final Part statusPart;
if (type == CompletionType.AFTER_COMPLETION) {
- statusPart = msg.getPart(2);
+ statusPart = clientMessage.getPart(2);
} else {
statusPart = null;
}
- final TXManagerImpl txMgr = (TXManagerImpl) servConn.getCache().getCacheTransactionManager();
+ final TXManagerImpl txMgr = (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager();
final InternalDistributedMember member =
- (InternalDistributedMember) servConn.getProxyID().getDistributedMember();
+ (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember();
// get the tx state without associating it with this thread. That's done later
- final TXStateProxy txProxy = txMgr.masqueradeAs(msg, member, true);
+ final TXStateProxy txProxy = txMgr.masqueradeAs(clientMessage, member, true);
// we have to run beforeCompletion and afterCompletion in the same thread
// because beforeCompletion obtains locks for the thread and afterCompletion
@@ -102,21 +101,21 @@ public class TXSynchronizationCommand extends BaseCommand {
TXStateProxy txState = null;
Throwable failureException = null;
try {
- txState = txMgr.masqueradeAs(msg, member, false);
+ txState = txMgr.masqueradeAs(clientMessage, member, false);
if (isDebugEnabled) {
logger.debug("Executing beforeCompletion() notification for transaction {}",
- msg.getTransactionId());
+ clientMessage.getTransactionId());
}
txState.setIsJTA(true);
txState.beforeCompletion();
try {
- writeReply(msg, servConn);
+ writeReply(clientMessage, serverConnection);
} catch (IOException e) {
if (isDebugEnabled) {
logger.debug("Problem writing reply to client", e);
}
}
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
} catch (ReplyException e) {
failureException = e.getCause();
} catch (InterruptedException e) {
@@ -128,13 +127,13 @@ public class TXSynchronizationCommand extends BaseCommand {
}
if (failureException != null) {
try {
- writeException(msg, failureException, false, servConn);
+ writeException(clientMessage, failureException, false, serverConnection);
} catch (IOException ioe) {
if (isDebugEnabled) {
logger.debug("Problem writing reply to client", ioe);
}
}
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
}
}
};
@@ -150,11 +149,11 @@ public class TXSynchronizationCommand extends BaseCommand {
public void run() {
TXStateProxy txState = null;
try {
- txState = txMgr.masqueradeAs(msg, member, false);
+ txState = txMgr.masqueradeAs(clientMessage, member, false);
int status = statusPart.getInt();
if (isDebugEnabled) {
logger.debug("Executing afterCompletion({}) notification for transaction {}",
- status, msg.getTransactionId());
+ status, clientMessage.getTransactionId());
}
txState.setIsJTA(true);
txState.afterCompletion(status);
@@ -162,7 +161,7 @@ public class TXSynchronizationCommand extends BaseCommand {
// where it can be applied to the local cache
TXCommitMessage cmsg = txState.getCommitMessage();
try {
- CommitCommand.writeCommitResponse(cmsg, msg, servConn);
+ CommitCommand.writeCommitResponse(cmsg, clientMessage, serverConnection);
txMgr.removeHostedTXState(txState.getTxId());
} catch (IOException e) {
// not much can be done here
@@ -170,16 +169,16 @@ public class TXSynchronizationCommand extends BaseCommand {
logger.warn("Problem writing reply to client", e);
}
}
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
} catch (RuntimeException e) {
try {
- writeException(msg, e, false, servConn);
+ writeException(clientMessage, e, false, serverConnection);
} catch (IOException ioe) {
if (isDebugEnabled) {
logger.debug("Problem writing reply to client", ioe);
}
}
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
@@ -195,12 +194,12 @@ public class TXSynchronizationCommand extends BaseCommand {
sync.runSecondRunnable(afterCompletion);
} else {
if (statusPart.getInt() == Status.STATUS_COMMITTED) {
- TXStateProxy txState = txMgr.masqueradeAs(msg, member, false);
+ TXStateProxy txState = txMgr.masqueradeAs(clientMessage, member, false);
try {
if (isDebugEnabled) {
logger.debug(
"Executing beforeCompletion() notification for transaction {} after failover",
- msg.getTransactionId());
+ clientMessage.getTransactionId());
}
txState.setIsJTA(true);
txState.beforeCompletion();
@@ -212,8 +211,8 @@ public class TXSynchronizationCommand extends BaseCommand {
}
}
} catch (Exception e) {
- writeException(msg, MessageType.EXCEPTION, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, MessageType.EXCEPTION, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
}
if (isDebugEnabled) {
logger.debug("Sent tx synchronization response");
http://git-wip-us.apache.org/repos/asf/geode/blob/db81427f/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterest.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterest.java
index 7dbb78f..597f92b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterest.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterest.java
@@ -45,43 +45,43 @@ public class UnregisterInterest extends BaseCommand {
UnregisterInterest() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws ClassNotFoundException, IOException {
Part regionNamePart = null, keyPart = null;
String regionName = null;
Object key = null;
int interestType = 0;
StringId errMessage = null;
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
- regionNamePart = msg.getPart(0);
- interestType = msg.getPart(1).getInt();
- keyPart = msg.getPart(2);
- Part isClosingPart = msg.getPart(3);
+ regionNamePart = clientMessage.getPart(0);
+ interestType = clientMessage.getPart(1).getInt();
+ keyPart = clientMessage.getPart(2);
+ Part isClosingPart = clientMessage.getPart(3);
byte[] isClosingPartBytes = (byte[]) isClosingPart.getObject();
boolean isClosing = isClosingPartBytes[0] == 0x01;
regionName = regionNamePart.getString();
try {
key = keyPart.getStringOrObject();
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
boolean keepalive = false;
try {
- Part keepalivePart = msg.getPart(4);
+ Part keepalivePart = clientMessage.getPart(4);
byte[] keepaliveBytes = (byte[]) keepalivePart.getObject();
keepalive = keepaliveBytes[0] != 0x00;
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Received unregister interest request ({} bytes) from {} for region {} key {}",
- servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key);
+ serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getSocketString(), regionName, key);
}
// Process the unregister interest request
@@ -95,9 +95,9 @@ public class UnregisterInterest extends BaseCommand {
errMessage =
LocalizedStrings.UnRegisterInterest_THE_INPUT_REGION_NAME_FOR_THE_UNREGISTER_INTEREST_REQUEST_IS_NULL;
String s = errMessage.toLocalizedString();
- logger.warn("{}: {}", servConn.getName(), s);
- writeErrorResponse(msg, MessageType.UNREGISTER_INTEREST_DATA_ERROR, s, servConn);
- servConn.setAsTrue(RESPONDED);
+ logger.warn("{}: {}", serverConnection.getName(), s);
+ writeErrorResponse(clientMessage, MessageType.UNREGISTER_INTEREST_DATA_ERROR, s, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -108,12 +108,12 @@ public class UnregisterInterest extends BaseCommand {
this.securityService.authorizeRegionRead(regionName, key.toString());
}
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
if (!DynamicRegionFactory.regionIsDynamicRegionList(regionName)) {
try {
@@ -121,8 +121,8 @@ public class UnregisterInterest extends BaseCommand {
authzRequest.unregisterInterestAuthorize(regionName, key, interestType);
key = unregisterContext.getKey();
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
}
@@ -141,17 +141,17 @@ public class UnregisterInterest extends BaseCommand {
*/
// Unregister interest irrelevent of whether the region is present it or
// not
- servConn.getAcceptor().getCacheClientNotifier().unregisterClientInterest(regionName, key,
- interestType, isClosing, servConn.getProxyID(), keepalive);
+ serverConnection.getAcceptor().getCacheClientNotifier().unregisterClientInterest(regionName, key,
+ interestType, isClosing, serverConnection.getProxyID(), keepalive);
// Update the statistics and write the reply
// bserverStats.incLong(processDestroyTimeId,
// DistributionStats.getStatTime() - start);
// start = DistributionStats.getStatTime();
- writeReply(msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeReply(clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
if (logger.isDebugEnabled()) {
- logger.debug("{}: Sent unregister interest response for region {} key {}", servConn.getName(),
+ logger.debug("{}: Sent unregister interest response for region {} key {}", serverConnection.getName(),
regionName, key);
}
// bserverStats.incLong(writeDestroyResponseTimeId,
http://git-wip-us.apache.org/repos/asf/geode/blob/db81427f/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterestList.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterestList.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterestList.java
index 7369587..76cbba2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterestList.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterestList.java
@@ -46,48 +46,48 @@ public class UnregisterInterestList extends BaseCommand {
private UnregisterInterestList() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, ClassNotFoundException {
Part regionNamePart = null, keyPart = null, numberOfKeysPart = null;
String regionName = null;
Object key = null;
List keys = null;
int numberOfKeys = 0, partNumber = 0;
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
// bserverStats.incLong(readDestroyRequestTimeId,
// DistributionStats.getStatTime() - start);
// bserverStats.incInt(destroyRequestsId, 1);
// start = DistributionStats.getStatTime();
// Retrieve the data from the message parts
- regionNamePart = msg.getPart(0);
+ regionNamePart = clientMessage.getPart(0);
regionName = regionNamePart.getString();
- Part isClosingListPart = msg.getPart(1);
+ Part isClosingListPart = clientMessage.getPart(1);
byte[] isClosingListPartBytes = (byte[]) isClosingListPart.getObject();
boolean isClosingList = isClosingListPartBytes[0] == 0x01;
boolean keepalive = false;
try {
- Part keepalivePart = msg.getPart(2);
+ Part keepalivePart = clientMessage.getPart(2);
byte[] keepalivePartBytes = (byte[]) keepalivePart.getObject();
keepalive = keepalivePartBytes[0] == 0x01;
} catch (Exception e) {
- writeChunkedException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeChunkedException(clientMessage, e, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- numberOfKeysPart = msg.getPart(3);
+ numberOfKeysPart = clientMessage.getPart(3);
numberOfKeys = numberOfKeysPart.getInt();
partNumber = 4;
keys = new ArrayList();
for (int i = 0; i < numberOfKeys; i++) {
- keyPart = msg.getPart(partNumber + i);
+ keyPart = clientMessage.getPart(partNumber + i);
try {
key = keyPart.getStringOrObject();
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
keys.add(key);
@@ -95,7 +95,7 @@ public class UnregisterInterestList extends BaseCommand {
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Received unregister interest request ({} bytes) from {} for the following {} keys in region {}: {}",
- servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), numberOfKeys,
+ serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getSocketString(), numberOfKeys,
regionName, keys);
}
@@ -113,22 +113,22 @@ public class UnregisterInterestList extends BaseCommand {
LocalizedStrings.UnRegisterInterest_THE_INPUT_REGION_NAME_FOR_THE_UNREGISTER_INTEREST_REQUEST_IS_NULL;
}
String s = errMessage.toLocalizedString();
- logger.warn("{}: {}", servConn.getName(), s);
- writeErrorResponse(msg, MessageType.UNREGISTER_INTEREST_DATA_ERROR, s, servConn);
- servConn.setAsTrue(RESPONDED);
+ logger.warn("{}: {}", serverConnection.getName(), s);
+ writeErrorResponse(clientMessage, MessageType.UNREGISTER_INTEREST_DATA_ERROR, s, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
try {
this.securityService.authorizeRegionRead(regionName);
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
if (!DynamicRegionFactory.regionIsDynamicRegionList(regionName)) {
try {
@@ -136,8 +136,8 @@ public class UnregisterInterestList extends BaseCommand {
authzRequest.unregisterInterestListAuthorize(regionName, keys);
keys = (List) unregisterContext.getKey();
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
}
@@ -155,20 +155,20 @@ public class UnregisterInterestList extends BaseCommand {
* responded = true; } else {
*/
// Register interest
- servConn.getAcceptor().getCacheClientNotifier().unregisterClientInterest(regionName, keys,
- isClosingList, servConn.getProxyID(), keepalive);
+ serverConnection.getAcceptor().getCacheClientNotifier().unregisterClientInterest(regionName, keys,
+ isClosingList, serverConnection.getProxyID(), keepalive);
// Update the statistics and write the reply
// bserverStats.incLong(processDestroyTimeId,
// DistributionStats.getStatTime() - start);
// start = DistributionStats.getStatTime(); WHY ARE GETTING START AND NOT
// USING IT?
- writeReply(msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeReply(clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Sent unregister interest response for the following {} keys in region {}: {}",
- servConn.getName(), numberOfKeys, regionName, keys);
+ serverConnection.getName(), numberOfKeys, regionName, keys);
}
// bserverStats.incLong(writeDestroyResponseTimeId,
// DistributionStats.getStatTime() - start);
http://git-wip-us.apache.org/repos/asf/geode/blob/db81427f/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UpdateClientNotification.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UpdateClientNotification.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UpdateClientNotification.java
index 57aca22..b870a96 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UpdateClientNotification.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UpdateClientNotification.java
@@ -35,8 +35,8 @@ public class UpdateClientNotification extends BaseCommand {
private UpdateClientNotification() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
- CacheServerStats stats = servConn.getCacheServerStats();
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException {
+ CacheServerStats stats = serverConnection.getCacheServerStats();
{
long oldStart = start;
start = DistributionStats.getStatTime();
http://git-wip-us.apache.org/repos/asf/geode/blob/db81427f/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseCQ.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseCQ.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseCQ.java
index ac9b5da..72719b2 100644
--- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseCQ.java
+++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseCQ.java
@@ -44,30 +44,30 @@ public class CloseCQ extends BaseCQCommand {
private CloseCQ() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- ClientProxyMembershipID id = servConn.getProxyID();
- CacheServerStats stats = servConn.getCacheServerStats();
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException {
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ ClientProxyMembershipID id = serverConnection.getProxyID();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
// Based on MessageType.QUERY
// Added by Rao 2/1/2007
- servConn.setAsTrue(REQUIRES_RESPONSE);
- servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
start = DistributionStats.getStatTime();
// Retrieve the data from the message parts
- String cqName = msg.getPart(0).getString();
+ String cqName = clientMessage.getPart(0).getString();
if (logger.isDebugEnabled()) {
- logger.debug("{}: Received close CQ request from {} cqName: {}", servConn.getName(),
- servConn.getSocketString(), cqName);
+ logger.debug("{}: Received close CQ request from {} cqName: {}", serverConnection.getName(),
+ serverConnection.getSocketString(), cqName);
}
// Process the query request
if (cqName == null) {
String err =
LocalizedStrings.CloseCQ_THE_CQNAME_FOR_THE_CQ_CLOSE_REQUEST_IS_NULL.toLocalizedString();
- sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg.getTransactionId(), null, servConn);
+ sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, clientMessage.getTransactionId(), null, serverConnection);
return;
}
@@ -85,7 +85,7 @@ public class CloseCQ extends BaseCQCommand {
}
InternalCqQuery cqQuery = cqService.getCq(serverCqName);
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
String queryStr = null;
Set cqRegionNames = null;
@@ -102,22 +102,22 @@ public class CloseCQ extends BaseCQCommand {
// getMembershipID());
cqService.closeCq(cqName, id);
if (cqQuery != null)
- servConn.removeCq(cqName, cqQuery.isDurable());
+ serverConnection.removeCq(cqName, cqQuery.isDurable());
} catch (CqException cqe) {
- sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(), cqe, servConn);
+ sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", clientMessage.getTransactionId(), cqe, serverConnection);
return;
} catch (Exception e) {
String err =
LocalizedStrings.CloseCQ_EXCEPTION_WHILE_CLOSING_CQ_CQNAME_0.toLocalizedString(cqName);
- sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err, msg.getTransactionId(), e, servConn);
+ sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err, clientMessage.getTransactionId(), e, serverConnection);
return;
}
// Send OK to client
sendCqResponse(MessageType.REPLY,
- LocalizedStrings.CloseCQ_CQ_CLOSED_SUCCESSFULLY.toLocalizedString(), msg.getTransactionId(),
- null, servConn);
- servConn.setAsTrue(RESPONDED);
+ LocalizedStrings.CloseCQ_CQ_CLOSED_SUCCESSFULLY.toLocalizedString(), clientMessage.getTransactionId(),
+ null, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
{
long oldStart = start;
http://git-wip-us.apache.org/repos/asf/geode/blob/db81427f/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ.java
index 9bddbc7..d2a4453 100644
--- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ.java
+++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ.java
@@ -52,27 +52,27 @@ public class ExecuteCQ extends BaseCQCommand {
private ExecuteCQ() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, InterruptedException {
- AcceptorImpl acceptor = servConn.getAcceptor();
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- ClientProxyMembershipID id = servConn.getProxyID();
- CacheServerStats stats = servConn.getCacheServerStats();
+ AcceptorImpl acceptor = serverConnection.getAcceptor();
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ ClientProxyMembershipID id = serverConnection.getProxyID();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
- servConn.setAsTrue(REQUIRES_RESPONSE);
- servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
// Retrieve the data from the message parts
- String cqName = msg.getPart(0).getString();
- String cqQueryString = msg.getPart(1).getString();
- int cqState = msg.getPart(2).getInt();
+ String cqName = clientMessage.getPart(0).getString();
+ String cqQueryString = clientMessage.getPart(1).getString();
+ int cqState = clientMessage.getPart(2).getInt();
- Part isDurablePart = msg.getPart(3);
+ Part isDurablePart = clientMessage.getPart(3);
byte[] isDurableByte = isDurablePart.getSerializedForm();
boolean isDurable = (isDurableByte == null || isDurableByte[0] == 0) ? false : true;
if (logger.isDebugEnabled()) {
- logger.debug("{}: Received {} request from {} CqName: {} queryString: {}", servConn.getName(),
- MessageType.getString(msg.getMessageType()), servConn.getSocketString(), cqName,
+ logger.debug("{}: Received {} request from {} CqName: {} queryString: {}", serverConnection.getName(),
+ MessageType.getString(clientMessage.getMessageType()), serverConnection.getSocketString(), cqName,
cqQueryString);
}
@@ -87,7 +87,7 @@ public class ExecuteCQ extends BaseCQCommand {
qService = (DefaultQueryService) crHelper.getCache().getLocalQueryService();
// Authorization check
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
query = qService.newQuery(cqQueryString);
cqRegionNames = ((DefaultQuery) query).getRegionsInQuery(null);
@@ -108,10 +108,10 @@ public class ExecuteCQ extends BaseCQCommand {
cqQuery = cqServiceForExec.executeCq(cqName, cqQueryString, cqState, id,
acceptor.getCacheClientNotifier(), isDurable, false, 0, null);
} catch (CqException cqe) {
- sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(), cqe, servConn);
+ sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", clientMessage.getTransactionId(), cqe, serverConnection);
return;
} catch (Exception e) {
- writeChunkedException(msg, e, false, servConn);
+ writeChunkedException(clientMessage, e, serverConnection);
return;
}
@@ -119,7 +119,7 @@ public class ExecuteCQ extends BaseCQCommand {
boolean sendResults = false;
boolean successQuery = false;
- if (msg.getMessageType() == MessageType.EXECUTECQ_WITH_IR_MSG_TYPE) {
+ if (clientMessage.getMessageType() == MessageType.EXECUTECQ_WITH_IR_MSG_TYPE) {
sendResults = true;
}
@@ -130,8 +130,8 @@ public class ExecuteCQ extends BaseCQCommand {
cqRegionNames = ((DefaultQuery) query).getRegionsInQuery(null);
}
((DefaultQuery) query).setIsCqQuery(true);
- successQuery = processQuery(msg, query, cqQueryString, cqRegionNames, start, cqQuery,
- executeCQContext, servConn, sendResults);
+ successQuery = processQuery(clientMessage, query, cqQueryString, cqRegionNames, start, cqQuery,
+ executeCQContext, serverConnection, sendResults);
// Update the CQ statistics.
cqQuery.getVsdStats().setCqInitialResultsTime((DistributionStats.getStatTime()) - oldstart);
@@ -153,12 +153,12 @@ public class ExecuteCQ extends BaseCQCommand {
// Send OK to client
sendCqResponse(MessageType.REPLY,
LocalizedStrings.ExecuteCQ_CQ_CREATED_SUCCESSFULLY.toLocalizedString(),
- msg.getTransactionId(), null, servConn);
+ clientMessage.getTransactionId(), null, serverConnection);
long start2 = DistributionStats.getStatTime();
stats.incProcessCreateCqTime(start2 - oldstart);
}
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/db81427f/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ61.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ61.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ61.java
index de61445..805ee48 100755
--- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ61.java
+++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ61.java
@@ -60,30 +60,30 @@ public class ExecuteCQ61 extends BaseCQCommand {
private ExecuteCQ61() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, InterruptedException {
- AcceptorImpl acceptor = servConn.getAcceptor();
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- ClientProxyMembershipID id = servConn.getProxyID();
- CacheServerStats stats = servConn.getCacheServerStats();
+ AcceptorImpl acceptor = serverConnection.getAcceptor();
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ ClientProxyMembershipID id = serverConnection.getProxyID();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
- servConn.setAsTrue(REQUIRES_RESPONSE);
- servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
// Retrieve the data from the message parts
- String cqName = msg.getPart(0).getString();
- String cqQueryString = msg.getPart(1).getString();
- int cqState = msg.getPart(2).getInt();
+ String cqName = clientMessage.getPart(0).getString();
+ String cqQueryString = clientMessage.getPart(1).getString();
+ int cqState = clientMessage.getPart(2).getInt();
- Part isDurablePart = msg.getPart(3);
+ Part isDurablePart = clientMessage.getPart(3);
byte[] isDurableByte = isDurablePart.getSerializedForm();
boolean isDurable = (isDurableByte == null || isDurableByte[0] == 0) ? false : true;
// region data policy
- Part regionDataPolicyPart = msg.getPart(msg.getNumberOfParts() - 1);
+ Part regionDataPolicyPart = clientMessage.getPart(clientMessage.getNumberOfParts() - 1);
byte[] regionDataPolicyPartBytes = regionDataPolicyPart.getSerializedForm();
if (logger.isDebugEnabled()) {
- logger.debug("{}: Received {} request from {} CqName: {} queryString: {}", servConn.getName(),
- MessageType.getString(msg.getMessageType()), servConn.getSocketString(), cqName,
+ logger.debug("{}: Received {} request from {} CqName: {} queryString: {}", serverConnection.getName(),
+ MessageType.getString(clientMessage.getMessageType()), serverConnection.getSocketString(), cqName,
cqQueryString);
}
@@ -96,8 +96,7 @@ public class ExecuteCQ61 extends BaseCQCommand {
String err =
LocalizedStrings.ExecuteCQ_SERVER_NOTIFYBYSUBSCRIPTION_MODE_IS_SET_TO_FALSE_CQ_EXECUTION_IS_NOT_SUPPORTED_IN_THIS_MODE
.toLocalizedString();
- sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg.getTransactionId(), null,
- servConn);
+ sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, clientMessage.getTransactionId(), null, serverConnection);
return;
}
}
@@ -113,7 +112,7 @@ public class ExecuteCQ61 extends BaseCQCommand {
qService = (DefaultQueryService) crHelper.getCache().getLocalQueryService();
// Authorization check
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
query = qService.newQuery(cqQueryString);
cqRegionNames = ((DefaultQuery) query).getRegionsInQuery(null);
@@ -141,16 +140,16 @@ public class ExecuteCQ61 extends BaseCQCommand {
// registering cq with serverConnection so that when CCP will require auth info it can access
// that
// registering cq auth before as possibility that you may get event
- servConn.setCq(cqName, isDurable);
+ serverConnection.setCq(cqName, isDurable);
cqQuery = (ServerCQImpl) cqServiceForExec.executeCq(cqName, cqQueryString, cqState, id, ccn,
isDurable, true, regionDataPolicyPartBytes[0], null);
} catch (CqException cqe) {
- sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(), cqe, servConn);
- servConn.removeCq(cqName, isDurable);
+ sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", clientMessage.getTransactionId(), cqe, serverConnection);
+ serverConnection.removeCq(cqName, isDurable);
return;
} catch (Exception e) {
- writeChunkedException(msg, e, false, servConn);
- servConn.removeCq(cqName, isDurable);
+ writeChunkedException(clientMessage, e, serverConnection);
+ serverConnection.removeCq(cqName, isDurable);
return;
}
@@ -158,7 +157,7 @@ public class ExecuteCQ61 extends BaseCQCommand {
boolean sendResults = false;
boolean successQuery = false;
- if (msg.getMessageType() == MessageType.EXECUTECQ_WITH_IR_MSG_TYPE) {
+ if (clientMessage.getMessageType() == MessageType.EXECUTECQ_WITH_IR_MSG_TYPE) {
sendResults = true;
}
@@ -173,8 +172,8 @@ public class ExecuteCQ61 extends BaseCQCommand {
cqRegionNames = ((DefaultQuery) query).getRegionsInQuery(null);
}
((DefaultQuery) query).setIsCqQuery(true);
- successQuery = processQuery(msg, query, cqQueryString, cqRegionNames, start, cqQuery,
- executeCQContext, servConn, sendResults);
+ successQuery = processQuery(clientMessage, query, cqQueryString, cqRegionNames, start, cqQuery,
+ executeCQContext, serverConnection, sendResults);
// Update the CQ statistics.
@@ -203,12 +202,12 @@ public class ExecuteCQ61 extends BaseCQCommand {
// Send OK to client
sendCqResponse(MessageType.REPLY,
LocalizedStrings.ExecuteCQ_CQ_CREATED_SUCCESSFULLY.toLocalizedString(),
- msg.getTransactionId(), null, servConn);
+ clientMessage.getTransactionId(), null, serverConnection);
long start2 = DistributionStats.getStatTime();
stats.incProcessCreateCqTime(start2 - oldstart);
}
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
}
http://git-wip-us.apache.org/repos/asf/geode/blob/db81427f/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetCQStats.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetCQStats.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetCQStats.java
index 69be347..b1faeee 100644
--- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetCQStats.java
+++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetCQStats.java
@@ -36,32 +36,32 @@ public class GetCQStats extends BaseCQCommand {
private GetCQStats() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException {
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
- CacheServerStats stats = servConn.getCacheServerStats();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
- servConn.setAsTrue(REQUIRES_RESPONSE);
- servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
- logger.debug("{}: Received close all client CQs request from {}", servConn.getName(),
- servConn.getSocketString());
+ logger.debug("{}: Received close all client CQs request from {}", serverConnection.getName(),
+ serverConnection.getSocketString());
}
// Retrieve the data from the message parts
- String cqName = msg.getPart(0).getString();
+ String cqName = clientMessage.getPart(0).getString();
if (isDebugEnabled) {
- logger.debug("{}: Received close CQ request from {} cqName: {}", servConn.getName(),
- servConn.getSocketString(), cqName);
+ logger.debug("{}: Received close CQ request from {} cqName: {}", serverConnection.getName(),
+ serverConnection.getSocketString(), cqName);
}
// Process the query request
if (cqName == null) {
String err = "The cqName for the cq stats request is null";
- sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg.getTransactionId(), null, servConn);
+ sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, clientMessage.getTransactionId(), null, serverConnection);
return;
}
@@ -74,13 +74,12 @@ public class GetCQStats extends BaseCQCommand {
cqService.start();
} catch (Exception e) {
String err = "Exception while Getting the CQ Statistics. ";
- sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err, msg.getTransactionId(), e, servConn);
+ sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err, clientMessage.getTransactionId(), e, serverConnection);
return;
}
// Send OK to client
- sendCqResponse(MessageType.REPLY, "cq stats sent successfully.", msg.getTransactionId(), null,
- servConn);
- servConn.setAsTrue(RESPONDED);
+ sendCqResponse(MessageType.REPLY, "cq stats sent successfully.", clientMessage.getTransactionId(), null, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
{
long oldStart = start;
http://git-wip-us.apache.org/repos/asf/geode/blob/db81427f/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetDurableCQs.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetDurableCQs.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetDurableCQs.java
index a2d201d..e39c8e1 100755
--- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetDurableCQs.java
+++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetDurableCQs.java
@@ -44,19 +44,19 @@ public class GetDurableCQs extends BaseCQCommand {
private GetDurableCQs() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, InterruptedException {
- AcceptorImpl acceptor = servConn.getAcceptor();
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- ClientProxyMembershipID id = servConn.getProxyID();
- CacheServerStats stats = servConn.getCacheServerStats();
+ AcceptorImpl acceptor = serverConnection.getAcceptor();
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ ClientProxyMembershipID id = serverConnection.getProxyID();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
- servConn.setAsTrue(REQUIRES_RESPONSE);
- servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
if (logger.isDebugEnabled()) {
- logger.debug("{}: Received {} request from {}", servConn.getName(),
- MessageType.getString(msg.getMessageType()), servConn.getSocketString());
+ logger.debug("{}: Received {} request from {}", serverConnection.getName(),
+ MessageType.getString(clientMessage.getMessageType()), serverConnection.getSocketString());
}
DefaultQueryService qService = null;
@@ -68,7 +68,7 @@ public class GetDurableCQs extends BaseCQCommand {
this.securityService.authorizeClusterRead();
// Authorization check
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
authzRequest.getDurableCQsAuthorize();
}
@@ -76,34 +76,34 @@ public class GetDurableCQs extends BaseCQCommand {
cqServiceForExec = qService.getCqService();
List<String> durableCqs = cqServiceForExec.getAllDurableClientCqs(id);
- ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
+ ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage();
chunkedResponseMsg.setMessageType(MessageType.RESPONSE);
- chunkedResponseMsg.setTransactionId(msg.getTransactionId());
+ chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId());
chunkedResponseMsg.sendHeader();
- List durableCqList = new ArrayList(maximumChunkSize);
+ List durableCqList = new ArrayList(MAXIMUM_CHUNK_SIZE);
final boolean isTraceEnabled = logger.isTraceEnabled();
for (Iterator it = durableCqs.iterator(); it.hasNext();) {
Object durableCqName = it.next();
durableCqList.add(durableCqName);
if (isTraceEnabled) {
- logger.trace("{}: getDurableCqsResponse <{}>; list size was {}", servConn.getName(),
+ logger.trace("{}: getDurableCqsResponse <{}>; list size was {}", serverConnection.getName(),
durableCqName, durableCqList.size());
}
- if (durableCqList.size() == maximumChunkSize) {
+ if (durableCqList.size() == MAXIMUM_CHUNK_SIZE) {
// Send the chunk and clear the list
- sendDurableCqsResponseChunk(durableCqList, false, servConn);
+ sendDurableCqsResponseChunk(durableCqList, false, serverConnection);
durableCqList.clear();
}
}
// Send the last chunk even if the list is of zero size.
- sendDurableCqsResponseChunk(durableCqList, true, servConn);
+ sendDurableCqsResponseChunk(durableCqList, true, serverConnection);
} catch (CqException cqe) {
- sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(), cqe, servConn);
+ sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", clientMessage.getTransactionId(), cqe, serverConnection);
return;
} catch (Exception e) {
- writeChunkedException(msg, e, false, servConn);
+ writeChunkedException(clientMessage, e, serverConnection);
return;
}
}
@@ -114,7 +114,7 @@ public class GetDurableCQs extends BaseCQCommand {
chunkedResponseMsg.setNumberOfParts(1);
chunkedResponseMsg.setLastChunk(lastChunk);
- chunkedResponseMsg.addObjPart(list, zipValues);
+ chunkedResponseMsg.addObjPart(list, false);
if (logger.isDebugEnabled()) {
logger.debug("{}: Sending {} durableCQs response chunk{}", servConn.getName(),
http://git-wip-us.apache.org/repos/asf/geode/blob/db81427f/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/MonitorCQ.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/MonitorCQ.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/MonitorCQ.java
index a8fec9f..5393e81 100644
--- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/MonitorCQ.java
+++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/MonitorCQ.java
@@ -36,39 +36,38 @@ public class MonitorCQ extends BaseCQCommand {
private MonitorCQ() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- servConn.setAsTrue(REQUIRES_RESPONSE);
- servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException {
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
- int op = msg.getPart(0).getInt();
+ int op = clientMessage.getPart(0).getInt();
if (op < 1) {
// This should have been taken care at the client - remove?
String err = LocalizedStrings.MonitorCQ__0_THE_MONITORCQ_OPERATION_IS_INVALID
- .toLocalizedString(servConn.getName());
- sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg.getTransactionId(), null, servConn);
+ .toLocalizedString(serverConnection.getName());
+ sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, clientMessage.getTransactionId(), null, serverConnection);
return;
}
String regionName = null;
- if (msg.getNumberOfParts() == 2) {
+ if (clientMessage.getNumberOfParts() == 2) {
// This will be enable/disable on region.
- regionName = msg.getPart(1).getString();
+ regionName = clientMessage.getPart(1).getString();
if (regionName == null) {
// This should have been taken care at the client - remove?
String err =
LocalizedStrings.MonitorCQ__0_A_NULL_REGION_NAME_WAS_PASSED_FOR_MONITORCQ_OPERATION
- .toLocalizedString(servConn.getName());
- sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg.getTransactionId(), null,
- servConn);
+ .toLocalizedString(serverConnection.getName());
+ sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, clientMessage.getTransactionId(), null, serverConnection);
return;
}
}
if (logger.isDebugEnabled()) {
- logger.debug("{}: Received MonitorCq request from {} op: {}{}", servConn.getName(),
- servConn.getSocketString(), op, (regionName != null) ? " RegionName: " + regionName : "");
+ logger.debug("{}: Received MonitorCq request from {} op: {}{}", serverConnection.getName(),
+ serverConnection.getSocketString(), op, (regionName != null) ? " RegionName: " + regionName : "");
}
this.securityService.authorizeClusterRead();
@@ -85,12 +84,12 @@ public class MonitorCQ extends BaseCQCommand {
throw new CqException(
LocalizedStrings.CqService_INVALID_CQ_MONITOR_REQUEST_RECEIVED.toLocalizedString());
} catch (CqException cqe) {
- sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(), cqe, servConn);
+ sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", clientMessage.getTransactionId(), cqe, serverConnection);
return;
} catch (Exception e) {
String err = LocalizedStrings.MonitorCQ_EXCEPTION_WHILE_HANDLING_THE_MONITOR_REQUEST_OP_IS_0
.toLocalizedString(Integer.valueOf(op));
- sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err, msg.getTransactionId(), e, servConn);
+ sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err, clientMessage.getTransactionId(), e, serverConnection);
return;
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/db81427f/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/StopCQ.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/StopCQ.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/StopCQ.java
index 94304d3..070cb04 100644
--- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/StopCQ.java
+++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/StopCQ.java
@@ -44,30 +44,30 @@ public class StopCQ extends BaseCQCommand {
private StopCQ() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- ClientProxyMembershipID id = servConn.getProxyID();
- CacheServerStats stats = servConn.getCacheServerStats();
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException {
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ ClientProxyMembershipID id = serverConnection.getProxyID();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
// Based on MessageType.QUERY
// Added by Rao 2/1/2007
- servConn.setAsTrue(REQUIRES_RESPONSE);
- servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
start = DistributionStats.getStatTime();
// Retrieve the data from the message parts
- String cqName = msg.getPart(0).getString();
+ String cqName = clientMessage.getPart(0).getString();
if (logger.isDebugEnabled()) {
- logger.debug("{}: Received stop CQ request from {} cqName: {}", servConn.getName(),
- servConn.getSocketString(), cqName);
+ logger.debug("{}: Received stop CQ request from {} cqName: {}", serverConnection.getName(),
+ serverConnection.getSocketString(), cqName);
}
// Process the query request
if (cqName == null) {
String err =
LocalizedStrings.StopCQ_THE_CQNAME_FOR_THE_CQ_STOP_REQUEST_IS_NULL.toLocalizedString();
- sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg.getTransactionId(), null, servConn);
+ sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, clientMessage.getTransactionId(), null, serverConnection);
return;
}
@@ -86,7 +86,7 @@ public class StopCQ extends BaseCQCommand {
this.securityService.authorizeDataManage();
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
String queryStr = null;
Set cqRegionNames = null;
@@ -100,23 +100,23 @@ public class StopCQ extends BaseCQCommand {
}
cqService.stopCq(cqName, id);
if (cqQuery != null)
- servConn.removeCq(cqName, cqQuery.isDurable());
+ serverConnection.removeCq(cqName, cqQuery.isDurable());
} catch (CqException cqe) {
- sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(), cqe, servConn);
+ sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", clientMessage.getTransactionId(), cqe, serverConnection);
return;
} catch (Exception e) {
String err =
LocalizedStrings.StopCQ_EXCEPTION_WHILE_STOPPING_CQ_NAMED_0.toLocalizedString(cqName);
- sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err, msg.getTransactionId(), e, servConn);
+ sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err, clientMessage.getTransactionId(), e, serverConnection);
return;
}
// Send OK to client
sendCqResponse(MessageType.REPLY,
- LocalizedStrings.StopCQ_CQ_STOPPED_SUCCESSFULLY.toLocalizedString(), msg.getTransactionId(),
- null, servConn);
+ LocalizedStrings.StopCQ_CQ_STOPPED_SUCCESSFULLY.toLocalizedString(), clientMessage.getTransactionId(),
+ null, serverConnection);
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
{
long oldStart = start;