You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/31 23:15:28 UTC
[29/35] geode git commit: GEODE-2632: refactoring preparations for
SecurityService and BaseCommand changes
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
index 9ed00be..1fb8c8c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
@@ -23,21 +23,20 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.regex.Pattern;
+import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import org.apache.logging.log4j.Logger;
-import org.apache.geode.CancelException;
import org.apache.geode.CopyException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.SerializationException;
import org.apache.geode.SystemFailure;
-import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheLoaderException;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.InterestResultPolicy;
@@ -86,24 +85,13 @@ import org.apache.geode.security.GemFireSecurityException;
public abstract class BaseCommand implements Command {
protected static final Logger logger = LogService.getLogger();
- /**
- * Whether zipped values are being passed to/from the client. Can be modified using the system
- * property Message.ZIP_VALUES ? This does not appear to happen anywhere
- */
- protected static final boolean zipValues = false;
-
- protected static final boolean APPLY_RETRIES =
- Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "gateway.ApplyRetries");
-
- public static final byte[] OK_BYTES = new byte[] {0};
-
- public static final int maximumChunkSize =
- Integer.getInteger("BridgeServer.MAXIMUM_CHUNK_SIZE", 100).intValue();
+ private static final byte[] OK_BYTES = new byte[] {0};
- /** Maximum number of entries in each chunked response chunk */
+ public static final int MAXIMUM_CHUNK_SIZE =
+ Integer.getInteger("BridgeServer.MAXIMUM_CHUNK_SIZE", 100);
/** Whether to suppress logging of IOExceptions */
- private static boolean suppressIOExceptionLogging =
+ private static final boolean SUPPRESS_IO_EXCEPTION_LOGGING =
Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "bridge.suppressIOExceptionLogging");
/**
@@ -113,85 +101,89 @@ public abstract class BaseCommand implements Command {
* header.
*/
private static final int MAX_INCOMING_DATA =
- Integer.getInteger("BridgeServer.MAX_INCOMING_DATA", -1).intValue();
+ Integer.getInteger("BridgeServer.MAX_INCOMING_DATA", -1);
/**
* Maximum number of concurrent incoming client messages that a bridge server will allow. Once a
* server is working on this number additional incoming client messages will wait until one of
* them completes or fails.
*/
- private static final int MAX_INCOMING_MSGS =
- Integer.getInteger("BridgeServer.MAX_INCOMING_MSGS", -1).intValue();
+ private static final int MAX_INCOMING_MESSAGES =
+ Integer.getInteger("BridgeServer.MAX_INCOMING_MSGS", -1);
- private static final Semaphore incomingDataLimiter;
+ private static final Semaphore INCOMING_DATA_LIMITER;
+
+ private static final Semaphore INCOMING_MSG_LIMITER;
+
+ protected SecurityService securityService = IntegratedSecurityService.getSecurityService();
- private static final Semaphore incomingMsgLimiter;
static {
- Semaphore tmp;
+ Semaphore semaphore;
if (MAX_INCOMING_DATA > 0) {
// backport requires that this is fair since we inc by values > 1
- tmp = new Semaphore(MAX_INCOMING_DATA, true);
+ semaphore = new Semaphore(MAX_INCOMING_DATA, true);
} else {
- tmp = null;
+ semaphore = null;
}
- incomingDataLimiter = tmp;
- if (MAX_INCOMING_MSGS > 0) {
- tmp = new Semaphore(MAX_INCOMING_MSGS, false); // unfair for best
- // performance
+ INCOMING_DATA_LIMITER = semaphore;
+ if (MAX_INCOMING_MESSAGES > 0) {
+ // unfair for best performance
+ semaphore = new Semaphore(MAX_INCOMING_MESSAGES, false);
} else {
- tmp = null;
+ semaphore = null;
}
- incomingMsgLimiter = tmp;
-
+ INCOMING_MSG_LIMITER = semaphore;
}
- protected SecurityService securityService = IntegratedSecurityService.getSecurityService();
+ protected static byte[] okBytes() {
+ return OK_BYTES;
+ }
- public void execute(Message msg, ServerConnection servConn) {
+ @Override
+ public void execute(Message clientMessage, ServerConnection serverConnection) {
// Read the request and update the statistics
long start = DistributionStats.getStatTime();
- // servConn.resetTransientData();
- if (EntryLogger.isEnabled() && servConn != null) {
- EntryLogger.setSource(servConn.getMembershipID(), "c2s");
+ if (EntryLogger.isEnabled() && serverConnection != null) {
+ EntryLogger.setSource(serverConnection.getMembershipID(), "c2s");
}
- boolean shouldMasquerade = shouldMasqueradeForTx(msg, servConn);
+ boolean shouldMasquerade = shouldMasqueradeForTx(clientMessage, serverConnection);
try {
if (shouldMasquerade) {
- InternalCache cache = servConn.getCache();
+ InternalCache cache = serverConnection.getCache();
InternalDistributedMember member =
- (InternalDistributedMember) servConn.getProxyID().getDistributedMember();
+ (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember();
TXManagerImpl txMgr = cache.getTxManager();
TXStateProxy tx = null;
try {
- tx = txMgr.masqueradeAs(msg, member, false);
- cmdExecute(msg, servConn, start);
+ tx = txMgr.masqueradeAs(clientMessage, member, false);
+ cmdExecute(clientMessage, serverConnection, start);
tx.updateProxyServer(txMgr.getMemberId());
} finally {
txMgr.unmasquerade(tx);
}
} else {
- cmdExecute(msg, servConn, start);
+ cmdExecute(clientMessage, serverConnection, start);
}
} catch (TransactionException | CopyException | SerializationException | CacheWriterException
| CacheLoaderException | GemFireSecurityException | PartitionOfflineException
| MessageTooLargeException e) {
- handleExceptionNoDisconnect(msg, servConn, e);
+ handleExceptionNoDisconnect(clientMessage, serverConnection, e);
} catch (EOFException eof) {
- BaseCommand.handleEOFException(msg, servConn, eof);
+ BaseCommand.handleEOFException(clientMessage, serverConnection, eof);
} catch (InterruptedIOException e) { // Solaris only
- BaseCommand.handleInterruptedIOException(msg, servConn, e);
+ BaseCommand.handleInterruptedIOException(serverConnection, e);
} catch (IOException e) {
- BaseCommand.handleIOException(msg, servConn, e);
+ BaseCommand.handleIOException(clientMessage, serverConnection, e);
} catch (DistributedSystemDisconnectedException e) {
- BaseCommand.handleShutdownException(msg, servConn, e);
+ BaseCommand.handleShutdownException(clientMessage, serverConnection, e);
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable e) {
- BaseCommand.handleThrowable(msg, servConn, e);
+ BaseCommand.handleThrowable(clientMessage, serverConnection, e);
} finally {
EntryLogger.clearSource();
}
@@ -201,16 +193,12 @@ public abstract class BaseCommand implements Command {
* checks to see if this thread needs to masquerade as a transactional thread. clients after
* GFE_66 should be able to start a transaction.
*
- * @param msg
- * @param servConn
* @return true if thread should masquerade as a transactional thread.
*/
- protected boolean shouldMasqueradeForTx(Message msg, ServerConnection servConn) {
- if (servConn.getClientVersion().compareTo(Version.GFE_66) >= 0
- && msg.getTransactionId() > TXManagerImpl.NOTX) {
- return true;
- }
- return false;
+ protected boolean shouldMasqueradeForTx(Message clientMessage,
+ ServerConnection serverConnection) {
+ return serverConnection.getClientVersion().compareTo(Version.GFE_66) >= 0
+ && clientMessage.getTransactionId() > TXManagerImpl.NOTX;
}
/**
@@ -221,13 +209,11 @@ public abstract class BaseCommand implements Command {
* <p>
* The client event should have the event identifier from the client and the region affected by
* the operation.
- *
- * @param clientEvent
*/
public boolean recoverVersionTagForRetriedOperation(EntryEventImpl clientEvent) {
LocalRegion r = clientEvent.getRegion();
- VersionTag tag = null;
- if ((clientEvent.getVersionTag() != null) && (clientEvent.getVersionTag().isGatewayTag())) {
+ VersionTag tag;
+ if (clientEvent.getVersionTag() != null && clientEvent.getVersionTag().isGatewayTag()) {
tag = r.findVersionTagForGatewayEvent(clientEvent.getEventId());
} else {
tag = r.findVersionTagForClientEvent(clientEvent.getEventId());
@@ -246,7 +232,7 @@ public abstract class BaseCommand implements Command {
}
clientEvent.setVersionTag(tag);
}
- return (tag != null);
+ return tag != null;
}
/**
@@ -258,18 +244,18 @@ public abstract class BaseCommand implements Command {
* The client event should have the event identifier from the client and the region affected by
* the operation.
*/
- protected VersionTag findVersionTagsForRetriedBulkOp(LocalRegion r, EventID eventID) {
- VersionTag tag = r.findVersionTagForClientBulkOp(eventID);
+ protected VersionTag findVersionTagsForRetriedBulkOp(LocalRegion region, EventID eventID) {
+ VersionTag tag = region.findVersionTagForClientBulkOp(eventID);
if (tag != null) {
if (logger.isDebugEnabled()) {
logger.debug("recovered version tag {} for replayed bulk operation {}", tag, eventID);
}
return tag;
}
- if (r instanceof DistributedRegion || r instanceof PartitionedRegion) {
+ if (region instanceof DistributedRegion || region instanceof PartitionedRegion) {
// TODO this could be optimized for partitioned regions by sending the key
// so that the PR could look at an individual bucket for the event
- tag = FindVersionTagOperation.findVersionTag(r, eventID, true);
+ tag = FindVersionTagOperation.findVersionTag(region, eventID, true);
}
if (tag != null) {
if (logger.isDebugEnabled()) {
@@ -279,285 +265,249 @@ public abstract class BaseCommand implements Command {
return tag;
}
- abstract public void cmdExecute(Message msg, ServerConnection servConn, long start)
- throws IOException, ClassNotFoundException, InterruptedException;
+ public abstract void cmdExecute(Message clientMessage, ServerConnection serverConnection,
+ long start) throws IOException, ClassNotFoundException, InterruptedException;
- protected void writeReply(Message origMsg, ServerConnection servConn) throws IOException {
- Message replyMsg = servConn.getReplyMessage();
- servConn.getCache().getCancelCriterion().checkCancelInProgress(null);
+ protected void writeReply(Message origMsg, ServerConnection serverConnection) throws IOException {
+ Message replyMsg = serverConnection.getReplyMessage();
+ serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null);
replyMsg.setMessageType(MessageType.REPLY);
replyMsg.setNumberOfParts(1);
replyMsg.setTransactionId(origMsg.getTransactionId());
- replyMsg.addBytesPart(OK_BYTES);
- replyMsg.send(servConn);
+ replyMsg.addBytesPart(okBytes());
+ replyMsg.send(serverConnection);
if (logger.isTraceEnabled()) {
- logger.trace("{}: rpl tx: {}", servConn.getName(), origMsg.getTransactionId());
+ logger.trace("{}: rpl tx: {}", serverConnection.getName(), origMsg.getTransactionId());
}
}
- protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection servConn,
+ protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection serverConnection,
PartitionedRegion pr, byte nwHop) throws IOException {
- Message replyMsg = servConn.getReplyMessage();
- servConn.getCache().getCancelCriterion().checkCancelInProgress(null);
+ Message replyMsg = serverConnection.getReplyMessage();
+ serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null);
replyMsg.setMessageType(MessageType.REPLY);
replyMsg.setNumberOfParts(1);
replyMsg.setTransactionId(origMsg.getTransactionId());
replyMsg.addBytesPart(new byte[] {pr.getMetadataVersion(), nwHop});
- replyMsg.send(servConn);
+ replyMsg.send(serverConnection);
pr.getPrStats().incPRMetaDataSentCount();
if (logger.isTraceEnabled()) {
- logger.trace("{}: rpl with REFRESH_METADAT tx: {}", servConn.getName(),
+ logger.trace("{}: rpl with REFRESH_METADATA tx: {}", serverConnection.getName(),
origMsg.getTransactionId());
}
}
- private static void handleEOFException(Message msg, ServerConnection servConn, Exception eof) {
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- CacheServerStats stats = servConn.getCacheServerStats();
- boolean potentialModification = servConn.getPotentialModification();
+ private static void handleEOFException(Message msg, ServerConnection serverConnection,
+ Exception eof) {
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
+ boolean potentialModification = serverConnection.getPotentialModification();
if (!crHelper.isShutdown()) {
if (potentialModification) {
stats.incAbandonedWriteRequests();
} else {
stats.incAbandonedReadRequests();
}
- if (!suppressIOExceptionLogging) {
+ if (!SUPPRESS_IO_EXCEPTION_LOGGING) {
if (potentialModification) {
- int transId = (msg != null) ? msg.getTransactionId() : Integer.MIN_VALUE;
+ int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE;
logger.warn(LocalizedMessage.create(
LocalizedStrings.BaseCommand_0_EOFEXCEPTION_DURING_A_WRITE_OPERATION_ON_REGION__1_KEY_2_MESSAGEID_3,
- new Object[] {servConn.getName(), servConn.getModRegion(), servConn.getModKey(),
- Integer.valueOf(transId)}));
+ new Object[] {serverConnection.getName(), serverConnection.getModRegion(),
+ serverConnection.getModKey(), transId}));
} else {
logger.debug("EOF exception", eof);
logger.info(LocalizedMessage.create(
LocalizedStrings.BaseCommand_0_CONNECTION_DISCONNECT_DETECTED_BY_EOF,
- servConn.getName()));
+ serverConnection.getName()));
}
}
}
- servConn.setFlagProcessMessagesAsFalse();
- servConn.setClientDisconnectedException(eof);
+ serverConnection.setFlagProcessMessagesAsFalse();
+ serverConnection.setClientDisconnectedException(eof);
}
- private static void handleInterruptedIOException(Message msg, ServerConnection servConn,
- Exception e) {
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- if (!crHelper.isShutdown() && servConn.isOpen()) {
- if (!suppressIOExceptionLogging) {
+ private static void handleInterruptedIOException(ServerConnection serverConnection, Exception e) {
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ if (!crHelper.isShutdown() && serverConnection.isOpen()) {
+ if (!SUPPRESS_IO_EXCEPTION_LOGGING) {
if (logger.isDebugEnabled())
logger.debug("Aborted message due to interrupt: {}", e.getMessage(), e);
}
}
- servConn.setFlagProcessMessagesAsFalse();
- servConn.setClientDisconnectedException(e);
+ serverConnection.setFlagProcessMessagesAsFalse();
+ serverConnection.setClientDisconnectedException(e);
}
- private static void handleIOException(Message msg, ServerConnection servConn, Exception e) {
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- boolean potentialModification = servConn.getPotentialModification();
+ private static void handleIOException(Message msg, ServerConnection serverConnection,
+ Exception e) {
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ boolean potentialModification = serverConnection.getPotentialModification();
- if (!crHelper.isShutdown() && servConn.isOpen()) {
- if (!suppressIOExceptionLogging) {
+ if (!crHelper.isShutdown() && serverConnection.isOpen()) {
+ if (!SUPPRESS_IO_EXCEPTION_LOGGING) {
if (potentialModification) {
- int transId = (msg != null) ? msg.getTransactionId() : Integer.MIN_VALUE;
+ int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE;
logger.warn(LocalizedMessage.create(
LocalizedStrings.BaseCommand_0_UNEXPECTED_IOEXCEPTION_DURING_OPERATION_FOR_REGION_1_KEY_2_MESSID_3,
- new Object[] {servConn.getName(), servConn.getModRegion(), servConn.getModKey(),
- Integer.valueOf(transId)}),
+ new Object[] {serverConnection.getName(), serverConnection.getModRegion(),
+ serverConnection.getModKey(), transId}),
e);
} else {
logger.warn(LocalizedMessage.create(LocalizedStrings.BaseCommand_0_UNEXPECTED_IOEXCEPTION,
- servConn.getName()), e);
+ serverConnection.getName()), e);
}
}
}
- servConn.setFlagProcessMessagesAsFalse();
- servConn.setClientDisconnectedException(e);
+ serverConnection.setFlagProcessMessagesAsFalse();
+ serverConnection.setClientDisconnectedException(e);
}
- private static void handleShutdownException(Message msg, ServerConnection servConn, Exception e) {
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- boolean potentialModification = servConn.getPotentialModification();
+ private static void handleShutdownException(Message msg, ServerConnection serverConnection,
+ Exception e) {
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ boolean potentialModification = serverConnection.getPotentialModification();
if (!crHelper.isShutdown()) {
if (potentialModification) {
- int transId = (msg != null) ? msg.getTransactionId() : Integer.MIN_VALUE;
+ int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE;
logger.warn(LocalizedMessage.create(
LocalizedStrings.BaseCommand_0_UNEXPECTED_SHUTDOWNEXCEPTION_DURING_OPERATION_ON_REGION_1_KEY_2_MESSAGEID_3,
- new Object[] {servConn.getName(), servConn.getModRegion(), servConn.getModKey(),
- Integer.valueOf(transId)}),
+ new Object[] {serverConnection.getName(), serverConnection.getModRegion(),
+ serverConnection.getModKey(), transId}),
e);
} else {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.BaseCommand_0_UNEXPECTED_SHUTDOWNEXCEPTION, servConn.getName()), e);
+ logger.warn(
+ LocalizedMessage.create(LocalizedStrings.BaseCommand_0_UNEXPECTED_SHUTDOWNEXCEPTION,
+ serverConnection.getName()),
+ e);
}
}
- servConn.setFlagProcessMessagesAsFalse();
- servConn.setClientDisconnectedException(e);
+ serverConnection.setFlagProcessMessagesAsFalse();
+ serverConnection.setClientDisconnectedException(e);
}
- // Handle GemfireSecurityExceptions separately since the connection should not
- // be terminated (by setting processMessages to false) unlike in
- // handleThrowable. Fixes bugs #38384 and #39392.
- // private static void handleGemfireSecurityException(Message msg,
- // ServerConnection servConn, GemFireSecurityException e) {
- //
- // boolean requiresResponse = servConn.getTransientFlag(REQUIRES_RESPONSE);
- // boolean responded = servConn.getTransientFlag(RESPONDED);
- // boolean requiresChunkedResponse = servConn
- // .getTransientFlag(REQUIRES_CHUNKED_RESPONSE);
- // boolean potentialModification = servConn.getPotentialModification();
- //
- // try {
- // try {
- // if (requiresResponse && !responded) {
- // if (requiresChunkedResponse) {
- // writeChunkedException(msg, e, false, servConn);
- // }
- // else {
- // writeException(msg, e, false, servConn);
- // }
- // servConn.setAsTrue(RESPONDED);
- // }
- // }
- // finally { // inner try-finally to ensure proper ordering of logging
- // if (potentialModification) {
- // int transId = (msg != null) ? msg.getTransactionId()
- // : Integer.MIN_VALUE;
- // }
- // }
- // }
- // catch (IOException ioe) {
- // if (logger.isDebugEnabled()) {
- // logger.fine(servConn.getName()
- // + ": Unexpected IOException writing security exception: ", ioe);
- // }
- // }
- // }
-
- private static void handleExceptionNoDisconnect(Message msg, ServerConnection servConn,
+ private static void handleExceptionNoDisconnect(Message msg, ServerConnection serverConnection,
Exception e) {
- boolean requiresResponse = servConn.getTransientFlag(REQUIRES_RESPONSE);
- boolean responded = servConn.getTransientFlag(RESPONDED);
- boolean requiresChunkedResponse = servConn.getTransientFlag(REQUIRES_CHUNKED_RESPONSE);
- boolean potentialModification = servConn.getPotentialModification();
- boolean wroteExceptionResponse = false;
+ boolean requiresResponse = serverConnection.getTransientFlag(REQUIRES_RESPONSE);
+ boolean responded = serverConnection.getTransientFlag(RESPONDED);
+ boolean requiresChunkedResponse = serverConnection.getTransientFlag(REQUIRES_CHUNKED_RESPONSE);
+ boolean potentialModification = serverConnection.getPotentialModification();
try {
+ boolean wroteExceptionResponse = false;
try {
if (requiresResponse && !responded) {
if (requiresChunkedResponse) {
- writeChunkedException(msg, e, false, servConn);
+ writeChunkedException(msg, e, serverConnection);
} else {
- writeException(msg, e, false, servConn);
+ writeException(msg, e, false, serverConnection);
}
wroteExceptionResponse = true;
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
}
} finally { // inner try-finally to ensure proper ordering of logging
if (potentialModification) {
- int transId = (msg != null) ? msg.getTransactionId() : Integer.MIN_VALUE;
+ int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE;
if (!wroteExceptionResponse) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION_DURING_OPERATION_ON_REGION_1_KEY_2_MESSAGEID_3,
- new Object[] {servConn.getName(), servConn.getModRegion(), servConn.getModKey(),
- Integer.valueOf(transId)}),
+ new Object[] {serverConnection.getName(), serverConnection.getModRegion(),
+ serverConnection.getModKey(), transId}),
e);
} else {
if (logger.isDebugEnabled()) {
logger.debug("{}: Exception during operation on region: {} key: {} messageId: {}",
- servConn.getName(), servConn.getModRegion(), servConn.getModKey(), transId, e);
+ serverConnection.getName(), serverConnection.getModRegion(),
+ serverConnection.getModKey(), transId, e);
}
}
} else {
if (!wroteExceptionResponse) {
logger.warn(LocalizedMessage.create(LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION,
- servConn.getName()), e);
+ serverConnection.getName()), e);
} else {
if (logger.isDebugEnabled()) {
- logger.debug("{}: Exception: {}", servConn.getName(), e.getMessage(), e);
+ logger.debug("{}: Exception: {}", serverConnection.getName(), e.getMessage(), e);
}
}
}
}
} catch (IOException ioe) {
if (logger.isDebugEnabled()) {
- logger.debug("{}: Unexpected IOException writing exception: {}", servConn.getName(),
+ logger.debug("{}: Unexpected IOException writing exception: {}", serverConnection.getName(),
ioe.getMessage(), ioe);
}
}
}
- private static void handleThrowable(Message msg, ServerConnection servConn, Throwable th) {
- boolean requiresResponse = servConn.getTransientFlag(REQUIRES_RESPONSE);
- boolean responded = servConn.getTransientFlag(RESPONDED);
- boolean requiresChunkedResponse = servConn.getTransientFlag(REQUIRES_CHUNKED_RESPONSE);
- boolean potentialModification = servConn.getPotentialModification();
+ private static void handleThrowable(Message msg, ServerConnection serverConnection,
+ Throwable th) {
+ boolean requiresResponse = serverConnection.getTransientFlag(REQUIRES_RESPONSE);
+ boolean responded = serverConnection.getTransientFlag(RESPONDED);
+ boolean requiresChunkedResponse = serverConnection.getTransientFlag(REQUIRES_CHUNKED_RESPONSE);
+ boolean potentialModification = serverConnection.getPotentialModification();
try {
try {
if (th instanceof Error) {
- logger.fatal(LocalizedMessage.create(
- LocalizedStrings.BaseCommand_0_UNEXPECTED_ERROR_ON_SERVER, servConn.getName()), th);
+ logger.fatal(
+ LocalizedMessage.create(LocalizedStrings.BaseCommand_0_UNEXPECTED_ERROR_ON_SERVER,
+ serverConnection.getName()),
+ th);
}
if (requiresResponse && !responded) {
if (requiresChunkedResponse) {
- writeChunkedException(msg, th, false, servConn);
+ writeChunkedException(msg, th, serverConnection);
} else {
- writeException(msg, th, false, servConn);
+ writeException(msg, th, false, serverConnection);
}
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
}
} finally { // inner try-finally to ensure proper ordering of logging
- if (th instanceof Error) {
- // log nothing
- } else if (th instanceof CancelException) {
- // log nothing
- } else {
+ if (!(th instanceof Error || th instanceof CacheLoaderException)) {
if (potentialModification) {
- int transId = (msg != null) ? msg.getTransactionId() : Integer.MIN_VALUE;
+ int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE;
logger.warn(LocalizedMessage.create(
LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION_DURING_OPERATION_ON_REGION_1_KEY_2_MESSAGEID_3,
- new Object[] {servConn.getName(), servConn.getModRegion(), servConn.getModKey(),
- Integer.valueOf(transId)}),
+ new Object[] {serverConnection.getName(), serverConnection.getModRegion(),
+ serverConnection.getModKey(), transId}),
th);
} else {
logger.warn(LocalizedMessage.create(LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION,
- servConn.getName()), th);
+ serverConnection.getName()), th);
}
}
}
} catch (IOException ioe) {
if (logger.isDebugEnabled()) {
- logger.debug("{}: Unexpected IOException writing exception: {}", servConn.getName(),
+ logger.debug("{}: Unexpected IOException writing exception: {}", serverConnection.getName(),
ioe.getMessage(), ioe);
}
} finally {
- servConn.setFlagProcessMessagesAsFalse();
- servConn.setClientDisconnectedException(th);
+ serverConnection.setFlagProcessMessagesAsFalse();
+ serverConnection.setClientDisconnectedException(th);
}
}
- protected static void writeChunkedException(Message origMsg, Throwable e, boolean isSevere,
- ServerConnection servConn) throws IOException {
- writeChunkedException(origMsg, e, isSevere, servConn, servConn.getChunkedResponseMessage());
+ protected static void writeChunkedException(Message origMsg, Throwable e,
+ ServerConnection serverConnection) throws IOException {
+ writeChunkedException(origMsg, e, serverConnection,
+ serverConnection.getChunkedResponseMessage());
}
- protected static void writeChunkedException(Message origMsg, Throwable e, boolean isSevere,
- ServerConnection servConn, ChunkedMessage originalReponse) throws IOException {
- writeChunkedException(origMsg, e, isSevere, servConn, originalReponse, 2);
+ protected static void writeChunkedException(Message origMsg, Throwable e,
+ ServerConnection serverConnection, ChunkedMessage originalResponse) throws IOException {
+ writeChunkedException(origMsg, e, serverConnection, originalResponse, 2);
}
- protected static void writeChunkedException(Message origMsg, Throwable exception,
- boolean isSevere, ServerConnection servConn, ChunkedMessage originalReponse, int numOfParts)
+ private static void writeChunkedException(Message origMsg, Throwable exception,
+ ServerConnection serverConnection, ChunkedMessage originalResponse, int numOfParts)
throws IOException {
- Throwable e = getClientException(servConn, exception);
- ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
- chunkedResponseMsg.setServerConnection(servConn);
- if (originalReponse.headerHasBeenSent()) {
- // chunkedResponseMsg = originalReponse;
- // fix for bug 35442
+ Throwable e = getClientException(serverConnection, exception);
+ ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage();
+ chunkedResponseMsg.setServerConnection(serverConnection);
+ if (originalResponse.headerHasBeenSent()) {
chunkedResponseMsg.setNumberOfParts(numOfParts);
chunkedResponseMsg.setLastChunkAndNumParts(true, numOfParts);
chunkedResponseMsg.addObjPart(e);
@@ -565,8 +515,8 @@ public abstract class BaseCommand implements Command {
chunkedResponseMsg.addStringPart(getExceptionTrace(e));
}
if (logger.isDebugEnabled()) {
- logger.debug("{}: Sending exception chunk while reply in progress: {}", servConn.getName(),
- e.getMessage(), e);
+ logger.debug("{}: Sending exception chunk while reply in progress: {}",
+ serverConnection.getName(), e.getMessage(), e);
}
} else {
chunkedResponseMsg.setMessageType(MessageType.EXCEPTION);
@@ -579,10 +529,11 @@ public abstract class BaseCommand implements Command {
chunkedResponseMsg.addStringPart(getExceptionTrace(e));
}
if (logger.isDebugEnabled()) {
- logger.debug("{}: Sending exception chunk: {}", servConn.getName(), e.getMessage(), e);
+ logger.debug("{}: Sending exception chunk: {}", serverConnection.getName(), e.getMessage(),
+ e);
}
}
- chunkedResponseMsg.sendChunk(servConn);
+ chunkedResponseMsg.sendChunk(serverConnection);
}
// Get the exception stacktrace for native clients
@@ -595,26 +546,25 @@ public abstract class BaseCommand implements Command {
}
protected static void writeException(Message origMsg, Throwable e, boolean isSevere,
- ServerConnection servConn) throws IOException {
- writeException(origMsg, MessageType.EXCEPTION, e, isSevere, servConn);
+ ServerConnection serverConnection) throws IOException {
+ writeException(origMsg, MessageType.EXCEPTION, e, isSevere, serverConnection);
}
- private static Throwable getClientException(ServerConnection servConn, Throwable e) {
- Cache cache = servConn.getCache();
- if (cache instanceof InternalCache) {
- InternalCache icache = (InternalCache) servConn.getCache();
- OldClientSupportService svc = icache.getService(OldClientSupportService.class);
+ private static Throwable getClientException(ServerConnection serverConnection, Throwable e) {
+ InternalCache cache = serverConnection.getCache();
+ if (cache != null) {
+ OldClientSupportService svc = cache.getService(OldClientSupportService.class);
if (svc != null) {
- return svc.getThrowable(e, servConn.getClientVersion());
+ return svc.getThrowable(e, serverConnection.getClientVersion());
}
}
return e;
}
protected static void writeException(Message origMsg, int msgType, Throwable e, boolean isSevere,
- ServerConnection servConn) throws IOException {
- Throwable theException = getClientException(servConn, e);
- Message errorMsg = servConn.getErrorResponseMessage();
+ ServerConnection serverConnection) throws IOException {
+ Throwable theException = getClientException(serverConnection, e);
+ Message errorMsg = serverConnection.getErrorResponseMessage();
errorMsg.setMessageType(msgType);
errorMsg.setNumberOfParts(2);
errorMsg.setTransactionId(origMsg.getTransactionId());
@@ -628,9 +578,9 @@ public abstract class BaseCommand implements Command {
}
errorMsg.addObjPart(theException);
errorMsg.addStringPart(getExceptionTrace(theException));
- errorMsg.send(servConn);
+ errorMsg.send(serverConnection);
if (logger.isDebugEnabled()) {
- logger.debug("{}: Wrote exception: {}", servConn.getName(), e.getMessage(), e);
+ logger.debug("{}: Wrote exception: {}", serverConnection.getName(), e.getMessage(), e);
}
if (e instanceof MessageTooLargeException) {
throw (IOException) e;
@@ -638,41 +588,41 @@ public abstract class BaseCommand implements Command {
}
protected static void writeErrorResponse(Message origMsg, int messageType,
- ServerConnection servConn) throws IOException {
- Message errorMsg = servConn.getErrorResponseMessage();
+ ServerConnection serverConnection) throws IOException {
+ Message errorMsg = serverConnection.getErrorResponseMessage();
errorMsg.setMessageType(messageType);
errorMsg.setNumberOfParts(1);
errorMsg.setTransactionId(origMsg.getTransactionId());
errorMsg.addStringPart(
LocalizedStrings.BaseCommand_INVALID_DATA_RECEIVED_PLEASE_SEE_THE_CACHE_SERVER_LOG_FILE_FOR_ADDITIONAL_DETAILS
.toLocalizedString());
- errorMsg.send(servConn);
+ errorMsg.send(serverConnection);
}
protected static void writeErrorResponse(Message origMsg, int messageType, String msg,
- ServerConnection servConn) throws IOException {
- Message errorMsg = servConn.getErrorResponseMessage();
+ ServerConnection serverConnection) throws IOException {
+ Message errorMsg = serverConnection.getErrorResponseMessage();
errorMsg.setMessageType(messageType);
errorMsg.setNumberOfParts(1);
errorMsg.setTransactionId(origMsg.getTransactionId());
errorMsg.addStringPart(msg);
- errorMsg.send(servConn);
+ errorMsg.send(serverConnection);
}
protected static void writeRegionDestroyedEx(Message msg, String regionName, String title,
- ServerConnection servConn) throws IOException {
- String reason = servConn.getName() + ": Region named " + regionName + title;
+ ServerConnection serverConnection) throws IOException {
+ String reason = serverConnection.getName() + ": Region named " + regionName + title;
RegionDestroyedException ex = new RegionDestroyedException(reason, regionName);
- if (servConn.getTransientFlag(REQUIRES_CHUNKED_RESPONSE)) {
- writeChunkedException(msg, ex, false, servConn);
+ if (serverConnection.getTransientFlag(REQUIRES_CHUNKED_RESPONSE)) {
+ writeChunkedException(msg, ex, serverConnection);
} else {
- writeException(msg, ex, false, servConn);
+ writeException(msg, ex, false, serverConnection);
}
}
protected static void writeResponse(Object data, Object callbackArg, Message origMsg,
- boolean isObject, ServerConnection servConn) throws IOException {
- Message responseMsg = servConn.getResponseMessage();
+ boolean isObject, ServerConnection serverConnection) throws IOException {
+ Message responseMsg = serverConnection.getResponseMessage();
responseMsg.setMessageType(MessageType.RESPONSE);
responseMsg.setTransactionId(origMsg.getTransactionId());
@@ -686,20 +636,20 @@ public abstract class BaseCommand implements Command {
responseMsg.addRawPart((byte[]) data, isObject);
} else {
Assert.assertTrue(isObject, "isObject should be true when value is not a byte[]");
- responseMsg.addObjPart(data, zipValues);
+ responseMsg.addObjPart(data, false);
}
if (callbackArg != null) {
responseMsg.addObjPart(callbackArg);
}
- servConn.getCache().getCancelCriterion().checkCancelInProgress(null);
- responseMsg.send(servConn);
+ serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null);
+ responseMsg.send(serverConnection);
origMsg.clearParts();
}
protected static void writeResponseWithRefreshMetadata(Object data, Object callbackArg,
- Message origMsg, boolean isObject, ServerConnection servConn, PartitionedRegion pr,
+ Message origMsg, boolean isObject, ServerConnection serverConnection, PartitionedRegion pr,
byte nwHop) throws IOException {
- Message responseMsg = servConn.getResponseMessage();
+ Message responseMsg = serverConnection.getResponseMessage();
responseMsg.setMessageType(MessageType.RESPONSE);
responseMsg.setTransactionId(origMsg.getTransactionId());
@@ -713,32 +663,32 @@ public abstract class BaseCommand implements Command {
responseMsg.addRawPart((byte[]) data, isObject);
} else {
Assert.assertTrue(isObject, "isObject should be true when value is not a byte[]");
- responseMsg.addObjPart(data, zipValues);
+ responseMsg.addObjPart(data, false);
}
if (callbackArg != null) {
responseMsg.addObjPart(callbackArg);
}
responseMsg.addBytesPart(new byte[] {pr.getMetadataVersion(), nwHop});
- servConn.getCache().getCancelCriterion().checkCancelInProgress(null);
- responseMsg.send(servConn);
+ serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null);
+ responseMsg.send(serverConnection);
origMsg.clearParts();
}
protected static void writeResponseWithFunctionAttribute(byte[] data, Message origMsg,
- ServerConnection servConn) throws IOException {
- Message responseMsg = servConn.getResponseMessage();
+ ServerConnection serverConnection) throws IOException {
+ Message responseMsg = serverConnection.getResponseMessage();
responseMsg.setMessageType(MessageType.RESPONSE);
responseMsg.setTransactionId(origMsg.getTransactionId());
responseMsg.setNumberOfParts(1);
responseMsg.addBytesPart(data);
- servConn.getCache().getCancelCriterion().checkCancelInProgress(null);
- responseMsg.send(servConn);
+ serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null);
+ responseMsg.send(serverConnection);
origMsg.clearParts();
}
- static protected void checkForInterrupt(ServerConnection servConn, Exception e)
+ protected static void checkForInterrupt(ServerConnection serverConnection, Exception e)
throws InterruptedException, InterruptedIOException {
- servConn.getCachedRegionHelper().checkCancelInProgress(e);
+ serverConnection.getCachedRegionHelper().checkCancelInProgress(e);
if (e instanceof InterruptedException) {
throw (InterruptedException) e;
}
@@ -747,37 +697,36 @@ public abstract class BaseCommand implements Command {
}
}
- protected static void writeQueryResponseChunk(Object queryResponseChunk,
- CollectionType collectionType, boolean lastChunk, ServerConnection servConn)
- throws IOException {
- ChunkedMessage queryResponseMsg = servConn.getQueryResponseMessage();
+ static void writeQueryResponseChunk(Object queryResponseChunk, CollectionType collectionType,
+ boolean lastChunk, ServerConnection serverConnection) throws IOException {
+ ChunkedMessage queryResponseMsg = serverConnection.getQueryResponseMessage();
queryResponseMsg.setNumberOfParts(2);
queryResponseMsg.setLastChunk(lastChunk);
- queryResponseMsg.addObjPart(collectionType, zipValues);
- queryResponseMsg.addObjPart(queryResponseChunk, zipValues);
- queryResponseMsg.sendChunk(servConn);
+ queryResponseMsg.addObjPart(collectionType, false);
+ queryResponseMsg.addObjPart(queryResponseChunk, false);
+ queryResponseMsg.sendChunk(serverConnection);
}
protected static void writeQueryResponseException(Message origMsg, Throwable exception,
- boolean isSevere, ServerConnection servConn) throws IOException {
- Throwable e = getClientException(servConn, exception);
- ChunkedMessage queryResponseMsg = servConn.getQueryResponseMessage();
- ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
+ ServerConnection serverConnection) throws IOException {
+ Throwable e = getClientException(serverConnection, exception);
+ ChunkedMessage queryResponseMsg = serverConnection.getQueryResponseMessage();
+ ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage();
if (queryResponseMsg.headerHasBeenSent()) {
// fix for bug 35442
// This client is expecting 2 parts in this message so send 2 parts
- queryResponseMsg.setServerConnection(servConn);
+ queryResponseMsg.setServerConnection(serverConnection);
queryResponseMsg.setNumberOfParts(2);
queryResponseMsg.setLastChunkAndNumParts(true, 2);
queryResponseMsg.addObjPart(e);
queryResponseMsg.addStringPart(getExceptionTrace(e));
if (logger.isDebugEnabled()) {
- logger.debug("{}: Sending exception chunk while reply in progress: {}", servConn.getName(),
- e.getMessage(), e);
+ logger.debug("{}: Sending exception chunk while reply in progress: {}",
+ serverConnection.getName(), e.getMessage(), e);
}
- queryResponseMsg.sendChunk(servConn);
+ queryResponseMsg.sendChunk(serverConnection);
} else {
- chunkedResponseMsg.setServerConnection(servConn);
+ chunkedResponseMsg.setServerConnection(serverConnection);
chunkedResponseMsg.setMessageType(MessageType.EXCEPTION);
chunkedResponseMsg.setNumberOfParts(2);
chunkedResponseMsg.setLastChunkAndNumParts(true, 2);
@@ -786,19 +735,20 @@ public abstract class BaseCommand implements Command {
chunkedResponseMsg.addObjPart(e);
chunkedResponseMsg.addStringPart(getExceptionTrace(e));
if (logger.isDebugEnabled()) {
- logger.debug("{}: Sending exception chunk: {}", servConn.getName(), e.getMessage(), e);
+ logger.debug("{}: Sending exception chunk: {}", serverConnection.getName(), e.getMessage(),
+ e);
}
- chunkedResponseMsg.sendChunk(servConn);
+ chunkedResponseMsg.sendChunk(serverConnection);
}
}
protected static void writeChunkedErrorResponse(Message origMsg, int messageType, String message,
- ServerConnection servConn) throws IOException {
+ ServerConnection serverConnection) throws IOException {
// Send chunked response header identifying error message
- ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
+ ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage();
if (logger.isDebugEnabled()) {
- logger.debug(servConn.getName() + ": Sending error message header type: " + messageType
- + " transaction: " + origMsg.getTransactionId());
+ logger.debug("{}: Sending error message header type: {} transaction: {}",
+ serverConnection.getName(), messageType, origMsg.getTransactionId());
}
chunkedResponseMsg.setMessageType(messageType);
chunkedResponseMsg.setTransactionId(origMsg.getTransactionId());
@@ -806,32 +756,32 @@ public abstract class BaseCommand implements Command {
// Send actual error
if (logger.isDebugEnabled()) {
- logger.debug("{}: Sending error message chunk: {}", servConn.getName(), message);
+ logger.debug("{}: Sending error message chunk: {}", serverConnection.getName(), message);
}
chunkedResponseMsg.setNumberOfParts(1);
chunkedResponseMsg.setLastChunk(true);
chunkedResponseMsg.addStringPart(message);
- chunkedResponseMsg.sendChunk(servConn);
+ chunkedResponseMsg.sendChunk(serverConnection);
}
protected static void writeFunctionResponseException(Message origMsg, int messageType,
- String message, ServerConnection servConn, Throwable exception) throws IOException {
- Throwable e = getClientException(servConn, exception);
- ChunkedMessage functionResponseMsg = servConn.getFunctionResponseMessage();
- ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
+ ServerConnection serverConnection, Throwable exception) throws IOException {
+ Throwable e = getClientException(serverConnection, exception);
+ ChunkedMessage functionResponseMsg = serverConnection.getFunctionResponseMessage();
+ ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage();
if (functionResponseMsg.headerHasBeenSent()) {
- functionResponseMsg.setServerConnection(servConn);
+ functionResponseMsg.setServerConnection(serverConnection);
functionResponseMsg.setNumberOfParts(2);
functionResponseMsg.setLastChunkAndNumParts(true, 2);
functionResponseMsg.addObjPart(e);
functionResponseMsg.addStringPart(getExceptionTrace(e));
if (logger.isDebugEnabled()) {
- logger.debug("{}: Sending exception chunk while reply in progress: {}", servConn.getName(),
- e.getMessage(), e);
+ logger.debug("{}: Sending exception chunk while reply in progress: {}",
+ serverConnection.getName(), e.getMessage(), e);
}
- functionResponseMsg.sendChunk(servConn);
+ functionResponseMsg.sendChunk(serverConnection);
} else {
- chunkedResponseMsg.setServerConnection(servConn);
+ chunkedResponseMsg.setServerConnection(serverConnection);
chunkedResponseMsg.setMessageType(messageType);
chunkedResponseMsg.setNumberOfParts(2);
chunkedResponseMsg.setLastChunkAndNumParts(true, 2);
@@ -840,9 +790,10 @@ public abstract class BaseCommand implements Command {
chunkedResponseMsg.addObjPart(e);
chunkedResponseMsg.addStringPart(getExceptionTrace(e));
if (logger.isDebugEnabled()) {
- logger.debug("{}: Sending exception chunk: {}", servConn.getName(), e.getMessage(), e);
+ logger.debug("{}: Sending exception chunk: {}", serverConnection.getName(), e.getMessage(),
+ e);
}
- chunkedResponseMsg.sendChunk(servConn);
+ chunkedResponseMsg.sendChunk(serverConnection);
}
}
@@ -898,14 +849,14 @@ public abstract class BaseCommand implements Command {
Message requestMsg = null;
try {
requestMsg = servConn.getRequestMessage();
- requestMsg.recv(servConn, MAX_INCOMING_DATA, incomingDataLimiter, incomingMsgLimiter);
+ requestMsg.recv(servConn, MAX_INCOMING_DATA, INCOMING_DATA_LIMITER, INCOMING_MSG_LIMITER);
return requestMsg;
} catch (EOFException eof) {
handleEOFException(null, servConn, eof);
- // TODO:Asif: Check if there is any need for explicitly returning
+ // TODO: Check if there is any need for explicitly returning
} catch (InterruptedIOException e) { // Solaris only
- handleInterruptedIOException(null, servConn, e);
+ handleInterruptedIOException(servConn, e);
} catch (IOException e) {
handleIOException(null, servConn, e);
@@ -930,7 +881,7 @@ public abstract class BaseCommand implements Command {
fillAndSendRegisterInterestResponseChunks(region, riKey, interestType, false, policy, servConn);
}
- /*
+ /**
* serializeValues is unused for clients < GFE_80
*/
protected static void fillAndSendRegisterInterestResponseChunks(LocalRegion region, Object riKey,
@@ -959,20 +910,20 @@ public abstract class BaseCommand implements Command {
// Not supported yet
throw new InternalGemFireError(
LocalizedStrings.BaseCommand_NOT_YET_SUPPORTED.toLocalizedString());
+
case InterestType.FILTER_CLASS:
throw new InternalGemFireError(
LocalizedStrings.BaseCommand_NOT_YET_SUPPORTED.toLocalizedString());
- // handleFilter(region, (String)riKey, policy);
- // break;
- case InterestType.REGULAR_EXPRESSION: {
+
+ case InterestType.REGULAR_EXPRESSION:
String regEx = (String) riKey;
if (regEx.equals(".*")) {
handleAllKeys(region, policy, servConn);
} else {
handleRegEx(region, regEx, policy, servConn);
}
- }
break;
+
case InterestType.KEY:
if (riKey.equals("ALL_KEYS")) {
handleAllKeys(region, policy, servConn);
@@ -980,13 +931,13 @@ public abstract class BaseCommand implements Command {
handleSingleton(region, riKey, policy, servConn);
}
break;
+
default:
throw new InternalGemFireError(
LocalizedStrings.BaseCommand_UNKNOWN_INTEREST_TYPE.toLocalizedString());
}
}
- @SuppressWarnings("rawtypes")
private static void handleKeysValuesPolicy(LocalRegion region, Object riKey, int interestType,
boolean serializeValues, ServerConnection servConn) throws IOException {
if (riKey instanceof List) {
@@ -1002,9 +953,11 @@ public abstract class BaseCommand implements Command {
case InterestType.OQL_QUERY:
throw new InternalGemFireError(
LocalizedStrings.BaseCommand_NOT_YET_SUPPORTED.toLocalizedString());
+
case InterestType.FILTER_CLASS:
throw new InternalGemFireError(
LocalizedStrings.BaseCommand_NOT_YET_SUPPORTED.toLocalizedString());
+
case InterestType.REGULAR_EXPRESSION:
String regEx = (String) riKey;
if (regEx.equals(".*")) {
@@ -1013,6 +966,7 @@ public abstract class BaseCommand implements Command {
handleKVAllKeys(region, regEx, serializeValues, servConn);
}
break;
+
case InterestType.KEY:
if (riKey.equals("ALL_KEYS")) {
handleKVAllKeys(region, null, serializeValues, servConn);
@@ -1020,6 +974,7 @@ public abstract class BaseCommand implements Command {
handleKVSingleton(region, riKey, serializeValues, servConn);
}
break;
+
default:
throw new InternalGemFireError(
LocalizedStrings.BaseCommand_UNKNOWN_INTEREST_TYPE.toLocalizedString());
@@ -1029,18 +984,17 @@ public abstract class BaseCommand implements Command {
/**
* @param list is a List of entry keys
*/
- protected static void sendRegisterInterestResponseChunk(Region region, Object riKey,
- ArrayList list, boolean lastChunk, ServerConnection servConn) throws IOException {
+ private static void sendRegisterInterestResponseChunk(Region region, Object riKey, List list,
+ boolean lastChunk, ServerConnection servConn) throws IOException {
ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage();
chunkedResponseMsg.setNumberOfParts(1);
chunkedResponseMsg.setLastChunk(lastChunk);
- chunkedResponseMsg.addObjPart(list, zipValues);
- String regionName = (region == null) ? " null " : region.getFullPath();
+ chunkedResponseMsg.addObjPart(list, false);
+ String regionName = region == null ? " null " : region.getFullPath();
if (logger.isDebugEnabled()) {
- String str = servConn.getName() + ": Sending" + (lastChunk ? " last " : " ")
- + "register interest response chunk for region: " + regionName + " for keys: " + riKey
- + " chunk=<" + chunkedResponseMsg + ">";
- logger.debug(str);
+ logger.debug(
+ "{}: Sending{}register interest response chunk for region: {} for keys: {} chunk=<{}>",
+ servConn.getName(), lastChunk ? " last " : " ", regionName, riKey, chunkedResponseMsg);
}
chunkedResponseMsg.sendChunk(servConn);
@@ -1050,14 +1004,12 @@ public abstract class BaseCommand implements Command {
* Determines whether keys for destroyed entries (tombstones) should be sent to clients in
* register-interest results.
*
- * @param servConn
- * @param policy
* @return true if tombstones should be sent to the client
*/
private static boolean sendTombstonesInRIResults(ServerConnection servConn,
InterestResultPolicy policy) {
- return (policy == InterestResultPolicy.KEYS_VALUES)
- && (servConn.getClientVersion().compareTo(Version.GFE_80) >= 0);
+ return policy == InterestResultPolicy.KEYS_VALUES
+ && servConn.getClientVersion().compareTo(Version.GFE_80) >= 0;
}
/**
@@ -1066,7 +1018,6 @@ public abstract class BaseCommand implements Command {
* @param region the region
* @param keyList the list of keys
* @param policy the policy
- * @throws IOException
*/
private static void handleList(LocalRegion region, List keyList, InterestResultPolicy policy,
ServerConnection servConn) throws IOException {
@@ -1075,15 +1026,14 @@ public abstract class BaseCommand implements Command {
handleListPR((PartitionedRegion) region, keyList, policy, servConn);
return;
}
- ArrayList newKeyList = new ArrayList(maximumChunkSize);
+ List newKeyList = new ArrayList(MAXIMUM_CHUNK_SIZE);
// Handle list of keys
if (region != null) {
- for (Iterator it = keyList.iterator(); it.hasNext();) {
- Object entryKey = it.next();
- if (region.containsKey(entryKey) || (sendTombstonesInRIResults(servConn, policy)
- && region.containsTombstone(entryKey))) {
+ for (Object entryKey : keyList) {
+ if (region.containsKey(entryKey)
+ || sendTombstonesInRIResults(servConn, policy) && region.containsTombstone(entryKey)) {
- appendInterestResponseKey(region, keyList, entryKey, newKeyList, "list", servConn);
+ appendInterestResponseKey(region, keyList, entryKey, newKeyList, servConn);
}
}
}
@@ -1095,14 +1045,12 @@ public abstract class BaseCommand implements Command {
/**
* Handles both RR and PR cases
*/
- @SuppressWarnings("rawtypes")
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_PARAM_DEREF",
+ @SuppressWarnings(value = "NP_NULL_PARAM_DEREF",
justification = "Null value handled in sendNewRegisterInterestResponseChunk()")
private static void handleKVSingleton(LocalRegion region, Object entryKey,
boolean serializeValues, ServerConnection servConn) throws IOException {
- VersionedObjectList values = new VersionedObjectList(maximumChunkSize, true,
- region == null ? true : region.getAttributes().getConcurrencyChecksEnabled(),
- serializeValues);
+ VersionedObjectList values = new VersionedObjectList(MAXIMUM_CHUNK_SIZE, true,
+ region == null || region.getAttributes().getConcurrencyChecksEnabled(), serializeValues);
if (region != null) {
if (region.containsKey(entryKey) || region.containsTombstone(entryKey)) {
@@ -1126,15 +1074,14 @@ public abstract class BaseCommand implements Command {
* @param region the region
* @param entryKey the key
* @param policy the policy
- * @throws IOException
*/
private static void handleSingleton(LocalRegion region, Object entryKey,
InterestResultPolicy policy, ServerConnection servConn) throws IOException {
- ArrayList keyList = new ArrayList(1);
+ List keyList = new ArrayList(1);
if (region != null) {
if (region.containsKey(entryKey)
- || (sendTombstonesInRIResults(servConn, policy) && region.containsTombstone(entryKey))) {
- appendInterestResponseKey(region, entryKey, entryKey, keyList, "individual", servConn);
+ || sendTombstonesInRIResults(servConn, policy) && region.containsTombstone(entryKey)) {
+ appendInterestResponseKey(region, entryKey, entryKey, keyList, servConn);
}
}
// Send the last chunk (the only chunk for individual and list keys)
@@ -1147,15 +1094,13 @@ public abstract class BaseCommand implements Command {
*
* @param region the region
* @param policy the policy
- * @throws IOException
*/
private static void handleAllKeys(LocalRegion region, InterestResultPolicy policy,
ServerConnection servConn) throws IOException {
- ArrayList keyList = new ArrayList(maximumChunkSize);
+ List keyList = new ArrayList(MAXIMUM_CHUNK_SIZE);
if (region != null) {
- for (Iterator it = region.keySet(sendTombstonesInRIResults(servConn, policy)).iterator(); it
- .hasNext();) {
- appendInterestResponseKey(region, "ALL_KEYS", it.next(), keyList, "ALL_KEYS", servConn);
+ for (Object entryKey : region.keySet(sendTombstonesInRIResults(servConn, policy))) {
+ appendInterestResponseKey(region, "ALL_KEYS", entryKey, keyList, servConn);
}
}
// Send the last chunk (the only chunk for individual and list keys)
@@ -1163,30 +1108,19 @@ public abstract class BaseCommand implements Command {
sendRegisterInterestResponseChunk(region, "ALL_KEYS", keyList, true, servConn);
}
- /**
- * @param region
- * @param regex
- * @param serializeValues
- * @param servConn
- * @throws IOException
- */
private static void handleKVAllKeys(LocalRegion region, String regex, boolean serializeValues,
ServerConnection servConn) throws IOException {
- if (region != null && region instanceof PartitionedRegion) {
+ if (region instanceof PartitionedRegion) {
handleKVKeysPR((PartitionedRegion) region, regex, serializeValues, servConn);
return;
}
- VersionedObjectList values = new VersionedObjectList(maximumChunkSize, true,
- region == null ? true : region.getAttributes().getConcurrencyChecksEnabled(),
- serializeValues);
+ VersionedObjectList values = new VersionedObjectList(MAXIMUM_CHUNK_SIZE, true,
+ region == null || region.getAttributes().getConcurrencyChecksEnabled(), serializeValues);
if (region != null) {
- VersionTag versionTag = null;
- Object data = null;
-
Pattern keyPattern = null;
if (regex != null) {
keyPattern = Pattern.compile(regex);
@@ -1207,11 +1141,11 @@ public abstract class BaseCommand implements Command {
}
ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
- data = region.get(key, null, true, true, true, id, versionHolder, true);
- versionTag = versionHolder.getVersionTag();
+ Object data = region.get(key, null, true, true, true, id, versionHolder, true);
+ VersionTag versionTag = versionHolder.getVersionTag();
updateValues(values, key, data, versionTag);
- if (values.size() == maximumChunkSize) {
+ if (values.size() == MAXIMUM_CHUNK_SIZE) {
sendNewRegisterInterestResponseChunk(region, regex != null ? regex : "ALL_KEYS", values,
false, servConn);
values.clear();
@@ -1227,20 +1161,18 @@ public abstract class BaseCommand implements Command {
private static void handleKVKeysPR(PartitionedRegion region, Object keyInfo,
boolean serializeValues, ServerConnection servConn) throws IOException {
- int id = 0;
- HashMap<Integer, HashSet> bucketKeys = null;
- VersionedObjectList values = new VersionedObjectList(maximumChunkSize, true,
+ VersionedObjectList values = new VersionedObjectList(MAXIMUM_CHUNK_SIZE, true,
region.getConcurrencyChecksEnabled(), serializeValues);
- if (keyInfo != null && keyInfo instanceof List) {
- bucketKeys = new HashMap<Integer, HashSet>();
+ if (keyInfo instanceof List) {
+ HashMap<Integer, HashSet> bucketKeys = new HashMap<>();
for (Object key : (List) keyInfo) {
- id = PartitionedRegionHelper.getHashKey(region, null, key, null, null);
+ int id = PartitionedRegionHelper.getHashKey(region, null, key, null, null);
if (bucketKeys.containsKey(id)) {
bucketKeys.get(id).add(key);
} else {
- HashSet<Object> keys = new HashSet<Object>();
+ HashSet<Object> keys = new HashSet<>();
keys.add(key);
bucketKeys.put(id, keys);
}
@@ -1259,8 +1191,6 @@ public abstract class BaseCommand implements Command {
/**
* Copied from Get70.getValueAndIsObject(), except a minor change. (Make the method static instead
* of copying it here?)
- *
- * @param value
*/
private static void updateValues(VersionedObjectList values, Object key, Object value,
VersionTag versionTag) {
@@ -1274,8 +1204,7 @@ public abstract class BaseCommand implements Command {
boolean wasInvalid = false;
if (value instanceof CachedDeserializable) {
value = ((CachedDeserializable) value).getValue();
- } else if (value == Token.REMOVED_PHASE1 || value == Token.REMOVED_PHASE2
- || value == Token.DESTROYED || value == Token.TOMBSTONE) {
+ } else if (isRemovalToken(value)) {
value = null;
} else if (value == Token.INVALID || value == Token.LOCAL_INVALID) {
value = null; // fix for bug 35884
@@ -1292,19 +1221,23 @@ public abstract class BaseCommand implements Command {
}
}
+ private static boolean isRemovalToken(final Object value) {
+ return value == Token.REMOVED_PHASE1 || value == Token.REMOVED_PHASE2
+ || value == Token.DESTROYED || value == Token.TOMBSTONE;
+ }
+
public static void appendNewRegisterInterestResponseChunkFromLocal(LocalRegion region,
VersionedObjectList values, Object riKeys, Set keySet, ServerConnection servConn)
throws IOException {
ClientProxyMembershipID requestingClient = servConn == null ? null : servConn.getProxyID();
- for (Iterator it = keySet.iterator(); it.hasNext();) {
- Object key = it.next();
+ for (Object key : keySet) {
VersionTagHolder versionHolder = createVersionTagHolder();
Object value = region.get(key, null, true, true, true, requestingClient, versionHolder, true);
updateValues(values, key, value, versionHolder.getVersionTag());
- if (values.size() == maximumChunkSize) {
+ if (values.size() == MAXIMUM_CHUNK_SIZE) {
// Send the chunk and clear the list
// values.setKeys(null); // Now we need to send keys too.
sendNewRegisterInterestResponseChunk(region, riKeys != null ? riKeys : "ALL_KEYS", values,
@@ -1314,24 +1247,14 @@ public abstract class BaseCommand implements Command {
} // for
}
- /**
- *
- * @param region
- * @param values {@link VersionedObjectList}
- * @param riKeys
- * @param set set of entries
- * @param servConn
- * @throws IOException
- */
public static void appendNewRegisterInterestResponseChunk(LocalRegion region,
- VersionedObjectList values, Object riKeys, Set set, ServerConnection servConn)
+ VersionedObjectList values, Object riKeys, Set<Map.Entry> set, ServerConnection servConn)
throws IOException {
- for (Iterator<Map.Entry> it = set.iterator(); it.hasNext();) {
- Map.Entry entry = it.next(); // Region.Entry or Map.Entry
+ for (Entry entry : set) {
if (entry instanceof Region.Entry) { // local entries
- VersionTag vt = null;
- Object key = null;
- Object value = null;
+ VersionTag vt;
+ Object key;
+ Object value;
if (entry instanceof EntrySnapshot) {
vt = ((EntrySnapshot) entry).getVersionTag();
key = ((EntrySnapshot) entry).getRegionEntry().getKey();
@@ -1349,14 +1272,13 @@ public abstract class BaseCommand implements Command {
}
}
} else { // Map.Entry (remote entries)
- ArrayList list = (ArrayList) entry.getValue();
+ List list = (List) entry.getValue();
Object value = list.get(0);
VersionTag tag = (VersionTag) list.get(1);
updateValues(values, entry.getKey(), value, tag);
}
- if (values.size() == maximumChunkSize) {
+ if (values.size() == MAXIMUM_CHUNK_SIZE) {
// Send the chunk and clear the list
- // values.setKeys(null); // Now we need to send keys too.
sendNewRegisterInterestResponseChunk(region, riKeys != null ? riKeys : "ALL_KEYS", values,
false, servConn);
values.clear();
@@ -1369,25 +1291,18 @@ public abstract class BaseCommand implements Command {
ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage();
chunkedResponseMsg.setNumberOfParts(1);
chunkedResponseMsg.setLastChunk(lastChunk);
- chunkedResponseMsg.addObjPart(list, zipValues);
- String regionName = (region == null) ? " null " : region.getFullPath();
+ chunkedResponseMsg.addObjPart(list, false);
+ String regionName = region == null ? " null " : region.getFullPath();
if (logger.isDebugEnabled()) {
- String str = servConn.getName() + ": Sending" + (lastChunk ? " last " : " ")
- + "register interest response chunk for region: " + regionName + " for keys: " + riKey
- + " chunk=<" + chunkedResponseMsg + ">";
- logger.debug(str);
+ logger.debug(
+ "{}: Sending{}register interest response chunk for region: {} for keys: {} chunk=<{}>",
+ servConn.getName(), lastChunk ? " last " : " ", regionName, riKey, chunkedResponseMsg);
}
-
chunkedResponseMsg.sendChunk(servConn);
}
/**
* Process an interest request of type {@link InterestType#REGULAR_EXPRESSION}
- *
- * @param region the region
- * @param regex the regex
- * @param policy the policy
- * @throws IOException
*/
private static void handleRegEx(LocalRegion region, String regex, InterestResultPolicy policy,
ServerConnection servConn) throws IOException {
@@ -1396,13 +1311,11 @@ public abstract class BaseCommand implements Command {
handleRegExPR((PartitionedRegion) region, regex, policy, servConn);
return;
}
- ArrayList keyList = new ArrayList(maximumChunkSize);
+ List keyList = new ArrayList(MAXIMUM_CHUNK_SIZE);
// Handle the regex pattern
- Pattern keyPattern = Pattern.compile(regex);
if (region != null) {
- for (Iterator it = region.keySet(sendTombstonesInRIResults(servConn, policy)).iterator(); it
- .hasNext();) {
- Object entryKey = it.next();
+ Pattern keyPattern = Pattern.compile(regex);
+ for (Object entryKey : region.keySet(sendTombstonesInRIResults(servConn, policy))) {
if (!(entryKey instanceof String)) {
// key is not a String, cannot apply regex to this entry
continue;
@@ -1412,7 +1325,7 @@ public abstract class BaseCommand implements Command {
continue;
}
- appendInterestResponseKey(region, regex, entryKey, keyList, "regex", servConn);
+ appendInterestResponseKey(region, regex, entryKey, keyList, servConn);
}
}
// Send the last chunk (the only chunk for individual and list keys)
@@ -1422,19 +1335,15 @@ public abstract class BaseCommand implements Command {
/**
* Process an interest request of type {@link InterestType#REGULAR_EXPRESSION}
- *
- * @param region the region
- * @param regex the regex
- * @param policy the policy
- * @throws IOException
*/
private static void handleRegExPR(final PartitionedRegion region, final String regex,
final InterestResultPolicy policy, final ServerConnection servConn) throws IOException {
- final ArrayList keyList = new ArrayList(maximumChunkSize);
+ final List keyList = new ArrayList(MAXIMUM_CHUNK_SIZE);
region.getKeysWithRegEx(regex, sendTombstonesInRIResults(servConn, policy),
new PartitionedRegion.SetCollector() {
+ @Override
public void receiveSet(Set theSet) throws IOException {
- appendInterestResponseKeys(region, regex, theSet, keyList, "regex", servConn);
+ appendInterestResponseKeys(region, regex, theSet, keyList, servConn);
}
});
// Send the last chunk (the only chunk for individual and list keys)
@@ -1444,19 +1353,15 @@ public abstract class BaseCommand implements Command {
/**
* Process an interest request involving a list of keys
- *
- * @param region the region
- * @param keyList the list of keys
- * @param policy the policy
- * @throws IOException
*/
private static void handleListPR(final PartitionedRegion region, final List keyList,
final InterestResultPolicy policy, final ServerConnection servConn) throws IOException {
- final ArrayList newKeyList = new ArrayList(maximumChunkSize);
+ final List newKeyList = new ArrayList(MAXIMUM_CHUNK_SIZE);
region.getKeysWithList(keyList, sendTombstonesInRIResults(servConn, policy),
new PartitionedRegion.SetCollector() {
+ @Override
public void receiveSet(Set theSet) throws IOException {
- appendInterestResponseKeys(region, keyList, theSet, newKeyList, "list", servConn);
+ appendInterestResponseKeys(region, keyList, theSet, newKeyList, servConn);
}
});
// Send the last chunk (the only chunk for individual and list keys)
@@ -1464,34 +1369,29 @@ public abstract class BaseCommand implements Command {
sendRegisterInterestResponseChunk(region, keyList, newKeyList, true, servConn);
}
- @SuppressWarnings("rawtypes")
private static void handleKVList(final LocalRegion region, final List keyList,
boolean serializeValues, final ServerConnection servConn) throws IOException {
- if (region != null && region instanceof PartitionedRegion) {
+ if (region instanceof PartitionedRegion) {
handleKVKeysPR((PartitionedRegion) region, keyList, serializeValues, servConn);
return;
}
- VersionedObjectList values = new VersionedObjectList(maximumChunkSize, true,
- region == null ? true : region.getAttributes().getConcurrencyChecksEnabled(),
- serializeValues);
+ VersionedObjectList values = new VersionedObjectList(MAXIMUM_CHUNK_SIZE, true,
+ region == null || region.getAttributes().getConcurrencyChecksEnabled(), serializeValues);
// Handle list of keys
if (region != null) {
- VersionTag versionTag = null;
- Object data = null;
- for (Iterator it = keyList.iterator(); it.hasNext();) {
- Object key = it.next();
+ for (Object key : keyList) {
if (region.containsKey(key) || region.containsTombstone(key)) {
VersionTagHolder versionHolder = createVersionTagHolder();
ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
- data = region.get(key, null, true, true, true, id, versionHolder, true);
- versionTag = versionHolder.getVersionTag();
+ Object data = region.get(key, null, true, true, true, id, versionHolder, true);
+ VersionTag versionTag = versionHolder.getVersionTag();
updateValues(values, key, data, versionTag);
- if (values.size() == maximumChunkSize) {
+ if (values.size() == MAXIMUM_CHUNK_SIZE) {
// Send the chunk and clear the list
// values.setKeys(null); // Now we need to send keys too.
sendNewRegisterInterestResponseChunk(region, keyList, values, false, servConn);
@@ -1518,27 +1418,25 @@ public abstract class BaseCommand implements Command {
* @param riKey the registerInterest "key" (what the client is interested in)
* @param entryKey key we're responding to
* @param list list to append to
- * @param kind for debugging
*/
private static void appendInterestResponseKey(LocalRegion region, Object riKey, Object entryKey,
- ArrayList list, String kind, ServerConnection servConn) throws IOException {
+ List list, ServerConnection servConn) throws IOException {
list.add(entryKey);
if (logger.isDebugEnabled()) {
logger.debug("{}: appendInterestResponseKey <{}>; list size was {}; region: {}",
servConn.getName(), entryKey, list.size(), region.getFullPath());
}
- if (list.size() == maximumChunkSize) {
+ if (list.size() == MAXIMUM_CHUNK_SIZE) {
// Send the chunk and clear the list
sendRegisterInterestResponseChunk(region, riKey, list, false, servConn);
list.clear();
}
}
- protected static void appendInterestResponseKeys(LocalRegion region, Object riKey,
- Collection entryKeys, ArrayList collector, String riDescr, ServerConnection servConn)
- throws IOException {
- for (Iterator it = entryKeys.iterator(); it.hasNext();) {
- appendInterestResponseKey(region, riKey, it.next(), collector, riDescr, servConn);
+ private static void appendInterestResponseKeys(LocalRegion region, Object riKey,
+ Collection entryKeys, List collector, ServerConnection servConn) throws IOException {
+ for (final Object entryKey : entryKeys) {
+ appendInterestResponseKey(region, riKey, entryKey, collector, servConn);
}
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandQuery.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandQuery.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandQuery.java
index 5f7a8ef..adf702a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandQuery.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandQuery.java
@@ -193,11 +193,11 @@ public abstract class BaseCommandQuery extends BaseCommand {
}
}
- int numberOfChunks = (int) Math.ceil(selectResults.size() * 1.0 / maximumChunkSize);
+ int numberOfChunks = (int) Math.ceil(selectResults.size() * 1.0 / MAXIMUM_CHUNK_SIZE);
if (logger.isTraceEnabled()) {
logger.trace("{}: Query results size: {}: Entries in chunk: {}: Number of chunks: {}",
- servConn.getName(), selectResults.size(), maximumChunkSize, numberOfChunks);
+ servConn.getName(), selectResults.size(), MAXIMUM_CHUNK_SIZE, numberOfChunks);
}
long oldStart = start;
@@ -262,7 +262,7 @@ public abstract class BaseCommandQuery extends BaseCommand {
QueryInvalidException qie =
new QueryInvalidException(LocalizedStrings.BaseCommand_0_QUERYSTRING_IS_1
.toLocalizedString(new Object[] {e.getLocalizedMessage(), queryString}));
- writeQueryResponseException(msg, qie, false, servConn);
+ writeQueryResponseException(msg, qie, servConn);
return false;
} catch (DistributedSystemDisconnectedException se) {
if (msg != null && logger.isDebugEnabled()) {
@@ -282,7 +282,7 @@ public abstract class BaseCommandQuery extends BaseCommand {
if ((defaultQuery).isCanceled()) {
e = new QueryException(defaultQuery.getQueryCanceledException().getMessage(), e.getCause());
}
- writeQueryResponseException(msg, e, false, servConn);
+ writeQueryResponseException(msg, e, servConn);
return false;
} finally {
// Since the query object is being shared in case of bind queries,
@@ -375,8 +375,8 @@ public abstract class BaseCommandQuery extends BaseCommand {
if (logger.isTraceEnabled()) {
logger.trace("{}: Creating chunk: {}", servConn.getName(), j);
}
- Object[] results = new Object[maximumChunkSize];
- for (int i = 0; i < maximumChunkSize; i++) {
+ Object[] results = new Object[MAXIMUM_CHUNK_SIZE];
+ for (int i = 0; i < MAXIMUM_CHUNK_SIZE; i++) {
if ((resultIndex) == selectResults.size()) {
incompleteArray = true;
break;
@@ -427,9 +427,9 @@ public abstract class BaseCommandQuery extends BaseCommand {
if (incompleteArray) {
Object[] newResults;
if (cqQuery != null) {
- newResults = new Object[cqResultIndex % maximumChunkSize];
+ newResults = new Object[cqResultIndex % MAXIMUM_CHUNK_SIZE];
} else {
- newResults = new Object[resultIndex % maximumChunkSize];
+ newResults = new Object[resultIndex % MAXIMUM_CHUNK_SIZE];
}
for (int i = 0; i < newResults.length; i++) {
newResults[i] = results[i];
@@ -463,8 +463,8 @@ public abstract class BaseCommandQuery extends BaseCommand {
if (logger.isTraceEnabled()) {
logger.trace("{}: Creating chunk: {}", servConn.getName(), j);
}
- ObjectPartList serializedObjs = new ObjectPartList(maximumChunkSize, false);
- for (int i = 0; i < maximumChunkSize; i++) {
+ ObjectPartList serializedObjs = new ObjectPartList(MAXIMUM_CHUNK_SIZE, false);
+ for (int i = 0; i < MAXIMUM_CHUNK_SIZE; i++) {
if ((resultIndex) == objs.size()) {
break;
}
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index e79bfbd..fd5154f 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -245,7 +245,7 @@ public class CacheClientNotifier {
public void registerClient(Socket socket, boolean isPrimary, long acceptorId,
boolean notifyBySubscription) throws IOException {
// Since no remote ports were specified in the message, wait for them.
- long startTime = this._statistics.startTime();
+ long startTime = this.statistics.startTime();
DataInputStream dis = new DataInputStream(socket.getInputStream());
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
@@ -402,7 +402,7 @@ public class CacheClientNotifier {
return;
}
- this._statistics.endClientRegistration(startTime);
+ this.statistics.endClientRegistration(startTime);
}
/**
@@ -481,7 +481,7 @@ public class CacheClientNotifier {
"CacheClientNotifier: A proxy exists for durable client with id {}. This proxy will be reinitialized: {}",
proxyId.getDurableId(), l_proxy);
}
- this._statistics.incDurableReconnectionCount();
+ this.statistics.incDurableReconnectionCount();
l_proxy.getProxyID().updateDurableTimeout(proxyId.getDurableTimeout());
l_proxy.reinitialize(socket, proxyId, this.getCache(), isPrimary, clientConflation,
clientVersion);
@@ -796,7 +796,7 @@ public class CacheClientNotifier {
return;
}
- long startTime = this._statistics.startTime();
+ long startTime = this.statistics.startTime();
ClientUpdateMessageImpl clientMessage;
if (cmsg == null) {
@@ -893,7 +893,7 @@ public class CacheClientNotifier {
singletonRouteClientMessage(conflatable, filterClients);
- this._statistics.endEvent(startTime);
+ this.statistics.endEvent(startTime);
// Cleanup destroyed events in CQ result cache.
// While maintaining the CQ results key caching. the destroy event
@@ -1491,7 +1491,7 @@ public class CacheClientNotifier {
this.clientPingTask.cancel();
// Close the statistics
- this._statistics.close();
+ this.statistics.close();
this.socketCloser.close();
}
@@ -1836,7 +1836,7 @@ public class CacheClientNotifier {
* @return the statistics for the notifier
*/
public CacheClientNotifierStats getStats() {
- return this._statistics;
+ return this.statistics;
}
/**
@@ -1911,7 +1911,7 @@ public class CacheClientNotifier {
} else {
factory = this.getCache().getDistributedSystem();
}
- this._statistics = new CacheClientNotifierStats(factory);
+ this.statistics = new CacheClientNotifierStats(factory);
try {
this.logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY));
@@ -1932,88 +1932,6 @@ public class CacheClientNotifier {
scheduleClientPingTask();
}
- /**
- * this message is used to send interest registration to another server. Since interest
- * registration performs a state-flush operation this message must not transmitted on an ordered
- * socket
- */
- public static class ServerInterestRegistrationMessage extends HighPriorityDistributionMessage
- implements MessageWithReply {
- ClientProxyMembershipID clientId;
- ClientInterestMessageImpl clientMessage;
- int processorId;
-
- ServerInterestRegistrationMessage(ClientProxyMembershipID clientID,
- ClientInterestMessageImpl msg) {
- this.clientId = clientID;
- this.clientMessage = msg;
- }
-
- public ServerInterestRegistrationMessage() {}
-
- static void sendInterestChange(DM dm, ClientProxyMembershipID clientID,
- ClientInterestMessageImpl msg) {
- ServerInterestRegistrationMessage smsg = new ServerInterestRegistrationMessage(clientID, msg);
- Set recipients = dm.getOtherDistributionManagerIds();
- smsg.setRecipients(recipients);
- ReplyProcessor21 rp = new ReplyProcessor21(dm, recipients);
- smsg.processorId = rp.getProcessorId();
- dm.putOutgoing(smsg);
- try {
- rp.waitForReplies();
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- }
-
- @Override
- protected void process(DistributionManager dm) {
- // Get the proxy for the proxy id
- try {
- CacheClientNotifier ccn = CacheClientNotifier.getInstance();
- if (ccn != null) {
- CacheClientProxy proxy = ccn.getClientProxy(clientId);
- // If this VM contains a proxy for the requested proxy id, forward the
- // message on to the proxy for processing
- if (proxy != null) {
- proxy.processInterestMessage(this.clientMessage);
- }
- }
- } finally {
- ReplyMessage reply = new ReplyMessage();
- reply.setProcessorId(this.processorId);
- reply.setRecipient(getSender());
- try {
- dm.putOutgoing(reply);
- } catch (CancelException e) {
- // can't send a reply, so ignore the exception
- }
- }
- }
-
- public int getDSFID() {
- return SERVER_INTEREST_REGISTRATION_MESSAGE;
- }
-
- @Override
- public void toData(DataOutput out) throws IOException {
- super.toData(out);
- out.writeInt(this.processorId);
- InternalDataSerializer.invokeToData(this.clientId, out);
- InternalDataSerializer.invokeToData(this.clientMessage, out);
- }
-
- @Override
- public void fromData(DataInput in) throws IOException, ClassNotFoundException {
- super.fromData(in);
- this.processorId = in.readInt();
- this.clientId = new ClientProxyMembershipID();
- InternalDataSerializer.invokeFromData(this.clientId, in);
- this.clientMessage = new ClientInterestMessageImpl();
- InternalDataSerializer.invokeFromData(this.clientMessage, in);
- }
- }
-
protected void deliverInterestChange(ClientProxyMembershipID proxyID,
ClientInterestMessageImpl message) {
DM dm = ((InternalDistributedSystem) this.getCache().getDistributedSystem())
@@ -2032,11 +1950,11 @@ public class CacheClientNotifier {
public void addCompiledQuery(DefaultQuery query) {
if (this.compiledQueries.putIfAbsent(query.getQueryString(), query) == null) {
// Added successfully.
- this._statistics.incCompiledQueryCount(1);
+ this.statistics.incCompiledQueryCount(1);
if (logger.isDebugEnabled()) {
logger.debug(
"Added compiled query into ccn.compliedQueries list. Query: {}. Total compiled queries: {}",
- query.getQueryString(), this._statistics.getCompiledQueryCount());
+ query.getQueryString(), this.statistics.getCompiledQueryCount());
}
// Start the clearIdleCompiledQueries thread.
startCompiledQueryCleanupThread();
@@ -2049,12 +1967,12 @@ public class CacheClientNotifier {
private void clearCompiledQueries() {
if (this.compiledQueries.size() > 0) {
- this._statistics.incCompiledQueryCount(-(this.compiledQueries.size()));
+ this.statistics.incCompiledQueryCount(-(this.compiledQueries.size()));
this.compiledQueries.clear();
if (logger.isDebugEnabled()) {
logger.debug(
"Removed all compiled queries from ccn.compliedQueries list. Total compiled queries: {}",
- this._statistics.getCompiledQueryCount());
+ this.statistics.getCompiledQueryCount());
}
}
}
@@ -2082,11 +2000,11 @@ public class CacheClientNotifier {
} else {
if (compiledQueries.remove(e.getKey()) != null) {
// If successfully removed decrement the counter.
- _statistics.incCompiledQueryCount(-1);
+ statistics.incCompiledQueryCount(-1);
if (isDebugEnabled) {
logger.debug("Removed compiled query from ccn.compliedQueries list. Query: "
+ q.getQueryString() + ". Total compiled queries are : "
- + _statistics.getCompiledQueryCount());
+ + statistics.getCompiledQueryCount());
}
}
}
@@ -2224,7 +2142,7 @@ public class CacheClientNotifier {
/**
* The statistics for this notifier
*/
- protected final CacheClientNotifierStats _statistics;
+ protected final CacheClientNotifierStats statistics;
/**
* The <code>InterestRegistrationListener</code> instances registered in this VM. This is used
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 75c89ab..8450db9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -1951,7 +1951,7 @@ public class CacheClientProxy implements ClientSession {
// Close the proxy
terminateDispatching(false);
- _cacheClientNotifier._statistics.incQueueDroppedCount();
+ _cacheClientNotifier.statistics.incQueueDroppedCount();
/**
* Setting the expiration task to null again and cancelling existing one, if any. See
@@ -2850,7 +2850,7 @@ public class CacheClientProxy implements ClientSession {
try {
this._messageQueue.put(clientMessage);
if (this._proxy.isPaused() && this._proxy.isDurable()) {
- this._proxy._cacheClientNotifier._statistics.incEventEnqueuedWhileClientAwayCount();
+ this._proxy._cacheClientNotifier.statistics.incEventEnqueuedWhileClientAwayCount();
if (logger.isDebugEnabled()) {
logger.debug("{}: Queued message while Durable Client is away {}", this, clientMessage);
}