You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/31 23:15:20 UTC
[21/35] geode git commit: GEODE-2632: refactoring preparations for
SecurityService and BaseCommand changes
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Request.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Request.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Request.java
index f7baba4..6f97d31 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Request.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Request.java
@@ -52,15 +52,16 @@ public class Request extends BaseCommand {
Request() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
+ throws IOException {
Part regionNamePart = null, keyPart = null, valuePart = null;
String regionName = null;
Object callbackArg = null, key = null;
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- CacheServerStats stats = servConn.getCacheServerStats();
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
StringId errMessage = null;
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
// requiresResponse = true;
{
long oldStart = start;
@@ -68,18 +69,18 @@ public class Request extends BaseCommand {
stats.incReadGetRequestTime(start - oldStart);
}
// Retrieve the data from the message parts
- int parts = msg.getNumberOfParts();
- regionNamePart = msg.getPart(0);
- keyPart = msg.getPart(1);
+ int parts = clientMessage.getNumberOfParts();
+ regionNamePart = clientMessage.getPart(0);
+ keyPart = clientMessage.getPart(1);
// valuePart = null; (redundant assignment)
if (parts > 2) {
- valuePart = msg.getPart(2);
+ valuePart = clientMessage.getPart(2);
try {
callbackArg = valuePart.getObject();
} catch (Exception e) {
- writeException(msg, e, false, servConn);
+ writeException(clientMessage, e, false, serverConnection);
// responded = true;
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
}
@@ -87,15 +88,15 @@ public class Request extends BaseCommand {
try {
key = keyPart.getStringOrObject();
} catch (Exception e) {
- writeException(msg, e, false, servConn);
+ writeException(clientMessage, e, false, serverConnection);
// responded = true;
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
if (logger.isDebugEnabled()) {
logger.debug("{}: Received get request ({} bytes) from {} for region {} key {} txId {}",
- servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key,
- msg.getTransactionId());
+ serverConnection.getName(), clientMessage.getPayloadLength(),
+ serverConnection.getSocketString(), regionName, key, clientMessage.getTransactionId());
}
// Process the get request
@@ -109,31 +110,31 @@ public class Request extends BaseCommand {
errMessage = LocalizedStrings.Request_THE_INPUT_REGION_NAME_FOR_THE_GET_REQUEST_IS_NULL;
}
String s = errMessage.toLocalizedString();
- logger.warn("{}: {}", servConn.getName(), s);
- writeErrorResponse(msg, MessageType.REQUESTDATAERROR, s, servConn);
+ logger.warn("{}: {}", serverConnection.getName(), s);
+ writeErrorResponse(clientMessage, MessageType.REQUESTDATAERROR, s, serverConnection);
// responded = true;
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
} else {
- Region region = servConn.getCache().getRegion(regionName);
+ Region region = serverConnection.getCache().getRegion(regionName);
if (region == null) {
String reason = LocalizedStrings.Request__0_WAS_NOT_FOUND_DURING_GET_REQUEST
.toLocalizedString(regionName);
- writeRegionDestroyedEx(msg, regionName, reason, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
} else {
GetOperationContext getContext = null;
try {
this.securityService.authorizeRegionRead(regionName, key.toString());
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
getContext = authzRequest.getAuthorize(regionName, key, callbackArg);
callbackArg = getContext.getCallbackArg();
}
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -141,10 +142,10 @@ public class Request extends BaseCommand {
// the value if it is a byte[].
Object[] valueAndIsObject = new Object[3];
try {
- getValueAndIsObject(region, key, callbackArg, servConn, valueAndIsObject);
+ getValueAndIsObject(region, key, callbackArg, serverConnection, valueAndIsObject);
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -154,7 +155,7 @@ public class Request extends BaseCommand {
try {
- AuthorizeRequestPP postAuthzRequest = servConn.getPostAuthzRequest();
+ AuthorizeRequestPP postAuthzRequest = serverConnection.getPostAuthzRequest();
if (postAuthzRequest != null) {
getContext = postAuthzRequest.getAuthorize(regionName, key, data, isObject, getContext);
byte[] serializedValue = getContext.getSerializedValue();
@@ -166,8 +167,8 @@ public class Request extends BaseCommand {
isObject = getContext.isObject();
}
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
{
@@ -179,20 +180,21 @@ public class Request extends BaseCommand {
if (region instanceof PartitionedRegion) {
PartitionedRegion pr = (PartitionedRegion) region;
if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) {
- writeResponseWithRefreshMetadata(data, callbackArg, msg, isObject, servConn, pr,
- pr.getNetworkHopType());
+ writeResponseWithRefreshMetadata(data, callbackArg, clientMessage, isObject,
+ serverConnection, pr, pr.getNetworkHopType());
pr.clearNetworkHopData();
} else {
- writeResponse(data, callbackArg, msg, isObject, servConn);
+ writeResponse(data, callbackArg, clientMessage, isObject, serverConnection);
}
} else {
- writeResponse(data, callbackArg, msg, isObject, servConn);
+ writeResponse(data, callbackArg, clientMessage, isObject, serverConnection);
}
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
if (logger.isDebugEnabled()) {
logger.debug("{}: Wrote get response back to {} for region {} key {} value: {}",
- servConn.getName(), servConn.getSocketString(), regionName, key, data);
+ serverConnection.getName(), serverConnection.getSocketString(), regionName, key,
+ data);
}
stats.incWriteGetResponseTime(DistributionStats.getStatTime() - start);
}
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java
index 3fd84d6..a6d6578 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java
@@ -49,57 +49,60 @@ public class RequestEventValue extends BaseCommand {
private RequestEventValue() {}
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
+ throws IOException {
Part eventIDPart = null, valuePart = null;
EventID event = null;
Object callbackArg = null;
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
StringBuffer errMessage = new StringBuffer();
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
// Retrieve the data from the message parts
- int parts = msg.getNumberOfParts();
- eventIDPart = msg.getPart(0);
+ int parts = clientMessage.getNumberOfParts();
+ eventIDPart = clientMessage.getPart(0);
if (eventIDPart == null) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.RequestEventValue_0_THE_EVENT_ID_FOR_THE_GET_EVENT_VALUE_REQUEST_IS_NULL,
- servConn.getName()));
+ serverConnection.getName()));
errMessage.append(" The event id for the get event value request is null.");
- writeErrorResponse(msg, MessageType.REQUESTDATAERROR, errMessage.toString(), servConn);
- servConn.setAsTrue(RESPONDED);
+ writeErrorResponse(clientMessage, MessageType.REQUESTDATAERROR, errMessage.toString(),
+ serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
} else {
try {
event = (EventID) eventIDPart.getObject();
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
if (parts > 1) {
- valuePart = msg.getPart(1);
+ valuePart = clientMessage.getPart(1);
try {
if (valuePart != null) {
callbackArg = valuePart.getObject();
}
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
}
if (logger.isTraceEnabled()) {
- logger.trace("{}: Received get event value request ({} bytes) from {}", servConn.getName(),
- msg.getPayloadLength(), servConn.getSocketString());
+ logger.trace("{}: Received get event value request ({} bytes) from {}",
+ serverConnection.getName(), clientMessage.getPayloadLength(),
+ serverConnection.getSocketString());
}
- CacheClientNotifier ccn = servConn.getAcceptor().getCacheClientNotifier();
+ CacheClientNotifier ccn = serverConnection.getAcceptor().getCacheClientNotifier();
// Get the ha container.
HAContainerWrapper haContainer = (HAContainerWrapper) ccn.getHaContainer();
if (haContainer == null) {
String reason = " was not found during get event value request";
- writeRegionDestroyedEx(msg, "ha container", reason, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeRegionDestroyedEx(clientMessage, "ha container", reason, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
} else {
Object[] valueAndIsObject = new Object[2];
try {
@@ -110,8 +113,9 @@ public class RequestEventValue extends BaseCommand {
LocalizedStrings.RequestEventValue_UNABLE_TO_FIND_A_CLIENT_UPDATE_MESSAGE_FOR_0,
event));
String msgStr = "No value found for " + event + " in " + haContainer.getName();
- writeErrorResponse(msg, MessageType.REQUEST_EVENT_VALUE_ERROR, msgStr, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeErrorResponse(clientMessage, MessageType.REQUEST_EVENT_VALUE_ERROR, msgStr,
+ serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
} else {
if (logger.isDebugEnabled()) {
@@ -130,20 +134,22 @@ public class RequestEventValue extends BaseCommand {
valueAndIsObject[1] = Boolean.valueOf(((ClientUpdateMessageImpl) data).valueIsObject());
}
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
Object data = valueAndIsObject[0];
boolean isObject = (Boolean) valueAndIsObject[1];
- writeResponse(data, callbackArg, msg, isObject, servConn);
- servConn.setAsTrue(RESPONDED);
- ccn.getClientProxy(servConn.getProxyID()).getStatistics().incDeltaFullMessagesSent();
+ writeResponse(data, callbackArg, clientMessage, isObject, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
+ ccn.getClientProxy(serverConnection.getProxyID()).getStatistics()
+ .incDeltaFullMessagesSent();
if (logger.isDebugEnabled()) {
logger.debug("{}: Wrote get event value response back to {} for ha container {}",
- servConn.getName(), servConn.getSocketString(), haContainer.getName());
+ serverConnection.getName(), serverConnection.getSocketString(),
+ haContainer.getName());
}
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RollbackCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RollbackCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RollbackCommand.java
index a579170..cd12ea7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RollbackCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RollbackCommand.java
@@ -39,18 +39,18 @@ public class RollbackCommand extends BaseCommand {
private RollbackCommand() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, ClassNotFoundException, InterruptedException {
- servConn.setAsTrue(REQUIRES_RESPONSE);
- TXManagerImpl txMgr = (TXManagerImpl) servConn.getCache().getCacheTransactionManager();
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
+ TXManagerImpl txMgr = (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager();
InternalDistributedMember client =
- (InternalDistributedMember) servConn.getProxyID().getDistributedMember();
- int uniqId = msg.getTransactionId();
+ (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember();
+ int uniqId = clientMessage.getTransactionId();
TXId txId = new TXId(client, uniqId);
if (txMgr.isHostedTxRecentlyCompleted(txId)) {
if (logger.isDebugEnabled()) {
logger.debug("TX: found a recently rolled back tx: {}", txId);
- sendRollbackReply(msg, servConn);
+ sendRollbackReply(clientMessage, serverConnection);
txMgr.removeHostedTXState(txId);
return;
}
@@ -60,16 +60,16 @@ public class RollbackCommand extends BaseCommand {
if (txState != null) {
txId = txState.getTxId();
txMgr.rollback();
- sendRollbackReply(msg, servConn);
+ sendRollbackReply(clientMessage, serverConnection);
} else {
// could not find TxState in the host server.
// Protect against a failover command received so late,
// and it is removed from the failoverMap due to capacity.
- sendRollbackReply(msg, servConn);
+ sendRollbackReply(clientMessage, serverConnection);
}
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
} finally {
if (logger.isDebugEnabled()) {
logger.debug("TX: removing tx state for {}", txId);
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Size.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Size.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Size.java
index c78f4d9..c4515ab 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Size.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Size.java
@@ -56,18 +56,18 @@ public class Size extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, InterruptedException {
StringBuilder errMessage = new StringBuilder();
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- CacheServerStats stats = servConn.getCacheServerStats();
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
long oldStart = start;
start = DistributionStats.getStatTime();
stats.incReadSizeRequestTime(start - oldStart);
// Retrieve the data from the message parts
- Part regionNamePart = msg.getPart(0);
+ Part regionNamePart = clientMessage.getPart(0);
String regionName = regionNamePart.getString();
if (regionName == null) {
@@ -76,8 +76,9 @@ public class Size extends BaseCommand {
errMessage
.append(LocalizedStrings.BaseCommand__THE_INPUT_REGION_NAME_FOR_THE_0_REQUEST_IS_NULL
.toLocalizedString("size"));
- writeErrorResponse(msg, MessageType.SIZE_ERROR, errMessage.toString(), servConn);
- servConn.setAsTrue(RESPONDED);
+ writeErrorResponse(clientMessage, MessageType.SIZE_ERROR, errMessage.toString(),
+ serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -85,38 +86,39 @@ public class Size extends BaseCommand {
if (region == null) {
String reason = LocalizedStrings.BaseCommand__0_WAS_NOT_FOUND_DURING_1_REQUEST
.toLocalizedString(regionName, "size");
- writeRegionDestroyedEx(msg, regionName, reason, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
// Size the entry
try {
this.securityService.authorizeRegionRead(regionName);
- writeSizeResponse(region.size(), msg, servConn);
+ writeSizeResponse(region.size(), clientMessage, serverConnection);
} catch (RegionDestroyedException rde) {
- writeException(msg, rde, false, servConn);
+ writeException(clientMessage, rde, false, serverConnection);
} catch (Exception e) {
// If an interrupted exception is thrown , rethrow it
- checkForInterrupt(servConn, e);
+ checkForInterrupt(serverConnection, e);
// If an exception occurs during the destroy, preserve the connection
- writeException(msg, e, false, servConn);
+ writeException(clientMessage, e, false, serverConnection);
if (e instanceof GemFireSecurityException) {
// Fine logging for security exceptions since these are already
// logged by the security logger
if (logger.isDebugEnabled()) {
- logger.debug("{}: Unexpected Security exception", servConn.getName(), e);
+ logger.debug("{}: Unexpected Security exception", serverConnection.getName(), e);
}
} else {
logger.warn(LocalizedMessage.create(LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION,
- servConn.getName()), e);
+ serverConnection.getName()), e);
}
} finally {
if (logger.isDebugEnabled()) {
- logger.debug("{}: Sent size response for region {}", servConn.getName(), regionName);
+ logger.debug("{}: Sent size response for region {}", serverConnection.getName(),
+ regionName);
}
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
stats.incWriteSizeResponseTime(DistributionStats.getStatTime() - start);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
index 72eab50..9fc3fd1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
@@ -49,23 +49,23 @@ public class TXFailoverCommand extends BaseCommand {
private TXFailoverCommand() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, ClassNotFoundException, InterruptedException {
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
// Build the TXId for the transaction
InternalDistributedMember client =
- (InternalDistributedMember) servConn.getProxyID().getDistributedMember();
- int uniqId = msg.getTransactionId();
+ (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember();
+ int uniqId = clientMessage.getTransactionId();
if (logger.isDebugEnabled()) {
logger.debug("TX: Transaction {} from {} is failing over to this server", uniqId, client);
}
TXId txId = new TXId(client, uniqId);
- TXManagerImpl mgr = (TXManagerImpl) servConn.getCache().getCacheTransactionManager();
+ TXManagerImpl mgr = (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager();
mgr.waitForCompletingTransaction(txId); // in case it's already completing here in another
// thread
if (mgr.isHostedTxRecentlyCompleted(txId)) {
- writeReply(msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeReply(clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
mgr.removeHostedTXState(txId);
return;
}
@@ -75,7 +75,7 @@ public class TXFailoverCommand extends BaseCommand {
if (!tx.isRealDealLocal()) {
// send message to all peers to find out who hosts the transaction
FindRemoteTXMessageReplyProcessor processor =
- FindRemoteTXMessage.send(servConn.getCache(), txId);
+ FindRemoteTXMessage.send(serverConnection.getCache(), txId);
try {
processor.waitForRepliesUninterruptibly();
} catch (ReplyException e) {
@@ -96,7 +96,7 @@ public class TXFailoverCommand extends BaseCommand {
// bug #42228 and bug #43504 - this cannot return until the current view
// has been installed by all members, so that dlocks are released and
// the same keys can be used in a new transaction by the same client thread
- InternalCache cache = servConn.getCache();
+ InternalCache cache = serverConnection.getCache();
try {
WaitForViewInstallation.send((DistributionManager) cache.getDistributionManager());
} catch (InterruptedException e) {
@@ -110,9 +110,9 @@ public class TXFailoverCommand extends BaseCommand {
}
mgr.saveTXCommitMessageForClientFailover(txId, processor.getTxCommitMessage());
} else {
- writeException(msg, new TransactionDataNodeHasDepartedException(
- "Could not find transaction host for " + txId), false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, new TransactionDataNodeHasDepartedException(
+ "Could not find transaction host for " + txId), false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
mgr.removeHostedTXState(txId);
return;
}
@@ -121,8 +121,8 @@ public class TXFailoverCommand extends BaseCommand {
if (!wasInProgress) {
mgr.setInProgress(false);
}
- writeReply(msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeReply(clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
index 8cedd2c..03270d6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
@@ -15,7 +15,6 @@
package org.apache.geode.internal.cache.tier.sockets.command;
-import org.apache.geode.cache.SynchronizationCommitConflictException;
import org.apache.geode.cache.client.internal.TXSynchronizationOp.CompletionType;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ReplyException;
@@ -54,7 +53,8 @@ public class TXSynchronizationCommand extends BaseCommand {
* org.apache.geode.internal.cache.tier.sockets.ServerConnection)
*/
@Override
- protected boolean shouldMasqueradeForTx(Message msg, ServerConnection servConn) {
+ protected boolean shouldMasqueradeForTx(Message clientMessage,
+ ServerConnection serverConnection) {
// masquerading is done in the waiting thread pool
return false;
}
@@ -68,26 +68,28 @@ public class TXSynchronizationCommand extends BaseCommand {
* long)
*/
@Override
- public void cmdExecute(final Message msg, final ServerConnection servConn, long start)
- throws IOException, ClassNotFoundException, InterruptedException {
+ public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection,
+ long start) throws IOException, ClassNotFoundException, InterruptedException {
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
- CompletionType type = CompletionType.values()[msg.getPart(0).getInt()];
- /* int txIdInt = */ msg.getPart(1).getInt(); // [bruce] not sure if we need to transmit this
+ CompletionType type = CompletionType.values()[clientMessage.getPart(0).getInt()];
+ /* int txIdInt = */ clientMessage.getPart(1).getInt(); // [bruce] not sure if we need to
+ // transmit this
final Part statusPart;
if (type == CompletionType.AFTER_COMPLETION) {
- statusPart = msg.getPart(2);
+ statusPart = clientMessage.getPart(2);
} else {
statusPart = null;
}
- final TXManagerImpl txMgr = (TXManagerImpl) servConn.getCache().getCacheTransactionManager();
+ final TXManagerImpl txMgr =
+ (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager();
final InternalDistributedMember member =
- (InternalDistributedMember) servConn.getProxyID().getDistributedMember();
+ (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember();
// get the tx state without associating it with this thread. That's done later
- final TXStateProxy txProxy = txMgr.masqueradeAs(msg, member, true);
+ final TXStateProxy txProxy = txMgr.masqueradeAs(clientMessage, member, true);
// we have to run beforeCompletion and afterCompletion in the same thread
// because beforeCompletion obtains locks for the thread and afterCompletion
@@ -102,21 +104,21 @@ public class TXSynchronizationCommand extends BaseCommand {
TXStateProxy txState = null;
Throwable failureException = null;
try {
- txState = txMgr.masqueradeAs(msg, member, false);
+ txState = txMgr.masqueradeAs(clientMessage, member, false);
if (isDebugEnabled) {
logger.debug("Executing beforeCompletion() notification for transaction {}",
- msg.getTransactionId());
+ clientMessage.getTransactionId());
}
txState.setIsJTA(true);
txState.beforeCompletion();
try {
- writeReply(msg, servConn);
+ writeReply(clientMessage, serverConnection);
} catch (IOException e) {
if (isDebugEnabled) {
logger.debug("Problem writing reply to client", e);
}
}
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
} catch (ReplyException e) {
failureException = e.getCause();
} catch (InterruptedException e) {
@@ -128,13 +130,13 @@ public class TXSynchronizationCommand extends BaseCommand {
}
if (failureException != null) {
try {
- writeException(msg, failureException, false, servConn);
+ writeException(clientMessage, failureException, false, serverConnection);
} catch (IOException ioe) {
if (isDebugEnabled) {
logger.debug("Problem writing reply to client", ioe);
}
}
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
}
}
};
@@ -150,11 +152,11 @@ public class TXSynchronizationCommand extends BaseCommand {
public void run() {
TXStateProxy txState = null;
try {
- txState = txMgr.masqueradeAs(msg, member, false);
+ txState = txMgr.masqueradeAs(clientMessage, member, false);
int status = statusPart.getInt();
if (isDebugEnabled) {
logger.debug("Executing afterCompletion({}) notification for transaction {}",
- status, msg.getTransactionId());
+ status, clientMessage.getTransactionId());
}
txState.setIsJTA(true);
txState.afterCompletion(status);
@@ -162,7 +164,7 @@ public class TXSynchronizationCommand extends BaseCommand {
// where it can be applied to the local cache
TXCommitMessage cmsg = txState.getCommitMessage();
try {
- CommitCommand.writeCommitResponse(cmsg, msg, servConn);
+ CommitCommand.writeCommitResponse(cmsg, clientMessage, serverConnection);
txMgr.removeHostedTXState(txState.getTxId());
} catch (IOException e) {
// not much can be done here
@@ -170,16 +172,16 @@ public class TXSynchronizationCommand extends BaseCommand {
logger.warn("Problem writing reply to client", e);
}
}
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
} catch (RuntimeException e) {
try {
- writeException(msg, e, false, servConn);
+ writeException(clientMessage, e, false, serverConnection);
} catch (IOException ioe) {
if (isDebugEnabled) {
logger.debug("Problem writing reply to client", ioe);
}
}
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
@@ -195,12 +197,12 @@ public class TXSynchronizationCommand extends BaseCommand {
sync.runSecondRunnable(afterCompletion);
} else {
if (statusPart.getInt() == Status.STATUS_COMMITTED) {
- TXStateProxy txState = txMgr.masqueradeAs(msg, member, false);
+ TXStateProxy txState = txMgr.masqueradeAs(clientMessage, member, false);
try {
if (isDebugEnabled) {
logger.debug(
"Executing beforeCompletion() notification for transaction {} after failover",
- msg.getTransactionId());
+ clientMessage.getTransactionId());
}
txState.setIsJTA(true);
txState.beforeCompletion();
@@ -212,8 +214,8 @@ public class TXSynchronizationCommand extends BaseCommand {
}
}
} catch (Exception e) {
- writeException(msg, MessageType.EXCEPTION, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, MessageType.EXCEPTION, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
}
if (isDebugEnabled) {
logger.debug("Sent tx synchronization response");
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterest.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterest.java
index 7dbb78f..199ac18 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterest.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterest.java
@@ -45,43 +45,44 @@ public class UnregisterInterest extends BaseCommand {
UnregisterInterest() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws ClassNotFoundException, IOException {
Part regionNamePart = null, keyPart = null;
String regionName = null;
Object key = null;
int interestType = 0;
StringId errMessage = null;
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
- regionNamePart = msg.getPart(0);
- interestType = msg.getPart(1).getInt();
- keyPart = msg.getPart(2);
- Part isClosingPart = msg.getPart(3);
+ regionNamePart = clientMessage.getPart(0);
+ interestType = clientMessage.getPart(1).getInt();
+ keyPart = clientMessage.getPart(2);
+ Part isClosingPart = clientMessage.getPart(3);
byte[] isClosingPartBytes = (byte[]) isClosingPart.getObject();
boolean isClosing = isClosingPartBytes[0] == 0x01;
regionName = regionNamePart.getString();
try {
key = keyPart.getStringOrObject();
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
boolean keepalive = false;
try {
- Part keepalivePart = msg.getPart(4);
+ Part keepalivePart = clientMessage.getPart(4);
byte[] keepaliveBytes = (byte[]) keepalivePart.getObject();
keepalive = keepaliveBytes[0] != 0x00;
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Received unregister interest request ({} bytes) from {} for region {} key {}",
- servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key);
+ serverConnection.getName(), clientMessage.getPayloadLength(),
+ serverConnection.getSocketString(), regionName, key);
}
// Process the unregister interest request
@@ -95,9 +96,10 @@ public class UnregisterInterest extends BaseCommand {
errMessage =
LocalizedStrings.UnRegisterInterest_THE_INPUT_REGION_NAME_FOR_THE_UNREGISTER_INTEREST_REQUEST_IS_NULL;
String s = errMessage.toLocalizedString();
- logger.warn("{}: {}", servConn.getName(), s);
- writeErrorResponse(msg, MessageType.UNREGISTER_INTEREST_DATA_ERROR, s, servConn);
- servConn.setAsTrue(RESPONDED);
+ logger.warn("{}: {}", serverConnection.getName(), s);
+ writeErrorResponse(clientMessage, MessageType.UNREGISTER_INTEREST_DATA_ERROR, s,
+ serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -108,12 +110,12 @@ public class UnregisterInterest extends BaseCommand {
this.securityService.authorizeRegionRead(regionName, key.toString());
}
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
if (!DynamicRegionFactory.regionIsDynamicRegionList(regionName)) {
try {
@@ -121,8 +123,8 @@ public class UnregisterInterest extends BaseCommand {
authzRequest.unregisterInterestAuthorize(regionName, key, interestType);
key = unregisterContext.getKey();
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
}
@@ -141,18 +143,18 @@ public class UnregisterInterest extends BaseCommand {
*/
// Unregister interest irrelevent of whether the region is present it or
// not
- servConn.getAcceptor().getCacheClientNotifier().unregisterClientInterest(regionName, key,
- interestType, isClosing, servConn.getProxyID(), keepalive);
+ serverConnection.getAcceptor().getCacheClientNotifier().unregisterClientInterest(regionName,
+ key, interestType, isClosing, serverConnection.getProxyID(), keepalive);
// Update the statistics and write the reply
// bserverStats.incLong(processDestroyTimeId,
// DistributionStats.getStatTime() - start);
// start = DistributionStats.getStatTime();
- writeReply(msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeReply(clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
if (logger.isDebugEnabled()) {
- logger.debug("{}: Sent unregister interest response for region {} key {}", servConn.getName(),
- regionName, key);
+ logger.debug("{}: Sent unregister interest response for region {} key {}",
+ serverConnection.getName(), regionName, key);
}
// bserverStats.incLong(writeDestroyResponseTimeId,
// DistributionStats.getStatTime() - start);
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterestList.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterestList.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterestList.java
index 7369587..1968bff 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterestList.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterestList.java
@@ -46,48 +46,48 @@ public class UnregisterInterestList extends BaseCommand {
private UnregisterInterestList() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, ClassNotFoundException {
Part regionNamePart = null, keyPart = null, numberOfKeysPart = null;
String regionName = null;
Object key = null;
List keys = null;
int numberOfKeys = 0, partNumber = 0;
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
// bserverStats.incLong(readDestroyRequestTimeId,
// DistributionStats.getStatTime() - start);
// bserverStats.incInt(destroyRequestsId, 1);
// start = DistributionStats.getStatTime();
// Retrieve the data from the message parts
- regionNamePart = msg.getPart(0);
+ regionNamePart = clientMessage.getPart(0);
regionName = regionNamePart.getString();
- Part isClosingListPart = msg.getPart(1);
+ Part isClosingListPart = clientMessage.getPart(1);
byte[] isClosingListPartBytes = (byte[]) isClosingListPart.getObject();
boolean isClosingList = isClosingListPartBytes[0] == 0x01;
boolean keepalive = false;
try {
- Part keepalivePart = msg.getPart(2);
+ Part keepalivePart = clientMessage.getPart(2);
byte[] keepalivePartBytes = (byte[]) keepalivePart.getObject();
keepalive = keepalivePartBytes[0] == 0x01;
} catch (Exception e) {
- writeChunkedException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeChunkedException(clientMessage, e, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- numberOfKeysPart = msg.getPart(3);
+ numberOfKeysPart = clientMessage.getPart(3);
numberOfKeys = numberOfKeysPart.getInt();
partNumber = 4;
keys = new ArrayList();
for (int i = 0; i < numberOfKeys; i++) {
- keyPart = msg.getPart(partNumber + i);
+ keyPart = clientMessage.getPart(partNumber + i);
try {
key = keyPart.getStringOrObject();
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
keys.add(key);
@@ -95,8 +95,8 @@ public class UnregisterInterestList extends BaseCommand {
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Received unregister interest request ({} bytes) from {} for the following {} keys in region {}: {}",
- servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), numberOfKeys,
- regionName, keys);
+ serverConnection.getName(), clientMessage.getPayloadLength(),
+ serverConnection.getSocketString(), numberOfKeys, regionName, keys);
}
// Process the unregister interest request
@@ -113,22 +113,23 @@ public class UnregisterInterestList extends BaseCommand {
LocalizedStrings.UnRegisterInterest_THE_INPUT_REGION_NAME_FOR_THE_UNREGISTER_INTEREST_REQUEST_IS_NULL;
}
String s = errMessage.toLocalizedString();
- logger.warn("{}: {}", servConn.getName(), s);
- writeErrorResponse(msg, MessageType.UNREGISTER_INTEREST_DATA_ERROR, s, servConn);
- servConn.setAsTrue(RESPONDED);
+ logger.warn("{}: {}", serverConnection.getName(), s);
+ writeErrorResponse(clientMessage, MessageType.UNREGISTER_INTEREST_DATA_ERROR, s,
+ serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
try {
this.securityService.authorizeRegionRead(regionName);
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
if (!DynamicRegionFactory.regionIsDynamicRegionList(regionName)) {
try {
@@ -136,8 +137,8 @@ public class UnregisterInterestList extends BaseCommand {
authzRequest.unregisterInterestListAuthorize(regionName, keys);
keys = (List) unregisterContext.getKey();
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
}
@@ -155,20 +156,20 @@ public class UnregisterInterestList extends BaseCommand {
* responded = true; } else {
*/
// Register interest
- servConn.getAcceptor().getCacheClientNotifier().unregisterClientInterest(regionName, keys,
- isClosingList, servConn.getProxyID(), keepalive);
+ serverConnection.getAcceptor().getCacheClientNotifier().unregisterClientInterest(regionName,
+ keys, isClosingList, serverConnection.getProxyID(), keepalive);
// Update the statistics and write the reply
// bserverStats.incLong(processDestroyTimeId,
// DistributionStats.getStatTime() - start);
// start = DistributionStats.getStatTime(); WHY ARE GETTING START AND NOT
// USING IT?
- writeReply(msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeReply(clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Sent unregister interest response for the following {} keys in region {}: {}",
- servConn.getName(), numberOfKeys, regionName, keys);
+ serverConnection.getName(), numberOfKeys, regionName, keys);
}
// bserverStats.incLong(writeDestroyResponseTimeId,
// DistributionStats.getStatTime() - start);
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UpdateClientNotification.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UpdateClientNotification.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UpdateClientNotification.java
index 57aca22..2f434fb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UpdateClientNotification.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UpdateClientNotification.java
@@ -35,8 +35,9 @@ public class UpdateClientNotification extends BaseCommand {
private UpdateClientNotification() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
- CacheServerStats stats = servConn.getCacheServerStats();
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
+ throws IOException {
+ CacheServerStats stats = serverConnection.getCacheServerStats();
{
long oldStart = start;
start = DistributionStats.getStatTime();
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 4e450c7..1afe6ff 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -2149,7 +2149,7 @@ public class Connection implements Runnable {
logger.fatal(LocalizedMessage
.create(LocalizedStrings.Connection_FAILED_HANDLING_CHUNK_MESSAGE), ex);
}
- } else /* (msgType == END_CHUNKED_MSG_TYPE) */ {
+ } else /* (messageType == END_CHUNKED_MSG_TYPE) */ {
MsgDestreamer md = obtainMsgDestreamer(msgId, remoteVersion);
this.owner.getConduit().stats.incMessagesBeingReceived(md.size() == 0, len);
try {
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java b/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java
index 031f827..80b16fc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java
@@ -30,8 +30,7 @@ import java.io.ObjectStreamClass;
/**
* Reusable Input/Output operation utility methods.
- * <p/>
- *
+ *
* @since GemFire 6.6
*/
@SuppressWarnings("unused")
@@ -44,8 +43,7 @@ public abstract class IOUtils {
* File.separator character. If the pathname is unspecified (null, empty or blank) then path
* elements are considered relative to file system root, beginning with File.separator. If array
* of path elements are null, then the pathname is returned as is.
- * </p>
- *
+ *
* @param pathname a String value indicating the base pathname.
* @param pathElements the path elements to append to pathname.
* @return the path elements appended to the pathname.
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java b/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java
new file mode 100644
index 0000000..017e0f5
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed;
+
+import org.apache.geode.cache.Cache;
+
+/**
+ * Provides tests a way to access non-public state in ServerLauncher
+ */
+public class ServerLauncherUtils {
+
+ /**
+ * Returns the Cache from an online in-process ServerLauncher instance
+ */
+ public static Cache getCache(final ServerLauncher serverLauncher) {
+ return serverLauncher.getCache();
+ }
+}
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java
index 39aa1e6..b529f0c 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java
@@ -14,166 +14,141 @@
*/
package org.apache.geode.internal.cache.ha;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.*;
-import java.io.IOException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.awaitility.Awaitility;
-
-import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.apache.geode.cache.CacheException;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
import org.apache.geode.test.junit.categories.IntegrationTest;
/**
* Test runs all tests of HARegionQueueJUnitTest using BlockingHARegionQueue instead of
* HARegionQueue.
- *
- *
*/
@Category({IntegrationTest.class, ClientSubscriptionTest.class})
public class BlockingHARegionQueueJUnitTest extends HARegionQueueJUnitTest {
- /**
- * Creates Blocking HA region-queue object
- *
- * @return Blocking HA region-queue object
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws CacheException
- * @throws InterruptedException
- */
- protected HARegionQueue createHARegionQueue(String name)
- throws IOException, ClassNotFoundException, CacheException, InterruptedException {
- HARegionQueue regionqueue =
- HARegionQueue.getHARegionQueueInstance(name, cache, HARegionQueue.BLOCKING_HA_QUEUE, false);
- return regionqueue;
- }
-
- /**
- * Creates Blocking HA region-queue object
- *
- * @return Blocking HA region-queue object
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws CacheException
- * @throws InterruptedException
- */
- protected HARegionQueue createHARegionQueue(String name, HARegionQueueAttributes attrs)
- throws IOException, ClassNotFoundException, CacheException, InterruptedException {
- HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name, cache, attrs,
- HARegionQueue.BLOCKING_HA_QUEUE, false);
- return regionqueue;
+ @Override
+ protected int queueType() {
+ return HARegionQueue.BLOCKING_HA_QUEUE;
}
/**
* Tests the effect of a put which is blocked because of capacity constraint & subsequent passage
* because of take operation
- *
*/
@Test
- public void testBlockingPutAndTake()
- throws InterruptedException, IOException, ClassNotFoundException {
+ public void testBlockingPutAndTake() throws Exception {
HARegionQueueAttributes hrqa = new HARegionQueueAttributes();
hrqa.setBlockingQueueCapacity(1);
- final HARegionQueue hrq = this.createHARegionQueue("testBlockingPutAndTake", hrqa);
- hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only.
+
+ HARegionQueue hrq = createHARegionQueue(this.testName.getMethodName(), hrqa);
+ hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked for primary only.
+
EventID id1 = new EventID(new byte[] {1}, 1, 1);
hrq.put(new ConflatableObject("key1", "val1", id1, false, "testing"));
- Thread t1 = new Thread(new Runnable() {
- public void run() {
- try {
- EventID id2 = new EventID(new byte[] {1}, 1, 2);
- hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing"));
- } catch (Exception e) {
- encounteredException = true;
- }
+
+ AtomicBoolean threadStarted = new AtomicBoolean(false);
+
+ Thread thread = new Thread(() -> {
+ try {
+ threadStarted.set(true);
+ EventID id2 = new EventID(new byte[] {1}, 1, 2);
+ hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing"));
+ } catch (InterruptedException e) {
+ errorCollector.addError(e);
}
});
- t1.start();
- Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> t1.isAlive());
+ thread.start();
+
+ Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> threadStarted.get());
+
Conflatable conf = (Conflatable) hrq.take();
- assertNotNull(conf);
- Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !t1.isAlive());
+ assertThat(conf, notNullValue());
+
+ Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !thread.isAlive());
}
/**
* Test Scenario : BlockingQueue capacity is 1. The first put should be successful. The second put
* should block till a peek/remove happens.
- *
*/
@Test
- public void testBlockingPutAndPeekRemove()
- throws InterruptedException, IOException, ClassNotFoundException {
+ public void testBlockingPutAndPeekRemove() throws Exception {
HARegionQueueAttributes hrqa = new HARegionQueueAttributes();
hrqa.setBlockingQueueCapacity(1);
- final HARegionQueue hrq = this.createHARegionQueue("testBlockingPutAndPeekRemove", hrqa);
+
+ HARegionQueue hrq = createHARegionQueue(this.testName.getMethodName(), hrqa);
hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only.
+
EventID id1 = new EventID(new byte[] {1}, 1, 1);
hrq.put(new ConflatableObject("key1", "val1", id1, false, "testing"));
- Thread t1 = new Thread(new Runnable() {
- public void run() {
- try {
- EventID id2 = new EventID(new byte[] {1}, 1, 2);
- hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing"));
- } catch (Exception e) {
- encounteredException = true;
- }
+
+ AtomicBoolean threadStarted = new AtomicBoolean(false);
+
+ Thread thread = new Thread(() -> {
+ try {
+ threadStarted.set(true);
+ EventID id2 = new EventID(new byte[] {1}, 1, 2);
+ hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing"));
+ } catch (Exception e) {
+ errorCollector.addError(e);
}
});
- t1.start();
- Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> t1.isAlive());
+ thread.start();
+
+ Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> threadStarted.get());
+
Conflatable conf = (Conflatable) hrq.peek();
- assertNotNull(conf);
+ assertThat(conf, notNullValue());
+
hrq.remove();
- Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !t1.isAlive());
- assertFalse("Exception occurred in put-thread", encounteredException);
+ Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !thread.isAlive());
}
/**
* Test Scenario :Blocking Queue capacity is 1. The first put should be successful.The second put
* should block till the first put expires.
- *
+ * <p>
+ * fix for 40314 - capacity constraint is checked for primary only and expiry is not applicable on
+ * primary so marking this test as invalid.
*/
- // fix for 40314 - capacity constraint is checked for primary only and
- // expiry is not applicable on primary so marking this test as invalid.
- @Ignore
@Test
- public void testBlockingPutAndExpiry()
- throws InterruptedException, IOException, ClassNotFoundException {
+ public void testBlockingPutAndExpiry() throws Exception {
HARegionQueueAttributes hrqa = new HARegionQueueAttributes();
hrqa.setBlockingQueueCapacity(1);
hrqa.setExpiryTime(1);
- final HARegionQueue hrq = this.createHARegionQueue("testBlockingPutAndExpiry", hrqa);
+
+ HARegionQueue hrq = this.createHARegionQueue(this.testName.getMethodName(), hrqa);
EventID id1 = new EventID(new byte[] {1}, 1, 1);
- long start = System.currentTimeMillis();
+
hrq.put(new ConflatableObject("key1", "val1", id1, false, "testing"));
- Thread t1 = new Thread(new Runnable() {
- public void run() {
- try {
- EventID id2 = new EventID(new byte[] {1}, 1, 2);
- hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing"));
- } catch (Exception e) {
- encounteredException = true;
- }
+
+ AtomicBoolean threadStarted = new AtomicBoolean(false);
+
+ Thread thread = new Thread(() -> {
+ try {
+ threadStarted.set(true);
+ EventID id2 = new EventID(new byte[] {1}, 1, 2);
+ hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing"));
+ } catch (Exception e) {
+ errorCollector.addError(e);
}
});
- t1.start();
- Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> t1.isAlive());
- waitAtLeast(1000, start, () -> {
- assertFalse("Put-thread blocked unexpectedly", t1.isAlive());
- });
- assertFalse("Exception occurred in put-thread", encounteredException);
+ thread.start();
+
+ Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> threadStarted.get());
+
+ Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !thread.isAlive());
}
}