You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2017/10/04 18:10:47 UTC
[geode] branch feature/GEODE-3730 updated: GEODE-3730: Moved retry
loop around switch statement
This is an automated email from the ASF dual-hosted git repository.
boglesby pushed a commit to branch feature/GEODE-3730
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-3730 by this push:
new 6e00da4 GEODE-3730: Moved retry loop around switch statement
6e00da4 is described below
commit 6e00da40cf4868a62e5964b98df500bfc57f9521
Author: Barry Oglesby <bo...@pivotal.io>
AuthorDate: Tue Oct 3 09:19:55 2017 -0700
GEODE-3730: Moved retry loop around switch statement
---
.../sockets/command/GatewayReceiverCommand.java | 662 +++++++++++----------
.../internal/cache/wan/GatewayReceiverStats.java | 22 +-
.../KeepEventsOnGatewaySenderQueueDUnitTest.java | 69 ++-
3 files changed, 425 insertions(+), 328 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
index 0b928f2..33043b2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
@@ -169,7 +169,7 @@ public class GatewayReceiverCommand extends BaseCommand {
// event received in batch also have PDX events at the start of the batch,to
// represent correct index on which the exception occurred, number of PDX
- // events need to be subtratced.
+ // events need to be subtracted.
int indexWithoutPDXEvent = -1; //
for (int i = 0; i < numberOfEvents; i++) {
boolean retry = true;
@@ -186,127 +186,140 @@ public class GatewayReceiverCommand extends BaseCommand {
boolean callbackArgExists = false;
try {
- Part possibleDuplicatePart = clientMessage.getPart(partNumber + 1);
- byte[] possibleDuplicatePartBytes;
- try {
- possibleDuplicatePartBytes = (byte[]) possibleDuplicatePart.getObject();
- } catch (Exception e) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_REQUEST_1_CONTAINING_2_EVENTS,
- new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
- Integer.valueOf(numberOfEvents)}),
- e);
- throw e;
- }
- boolean possibleDuplicate = possibleDuplicatePartBytes[0] == 0x01;
-
- // Make sure instance variables are null before each iteration
- regionName = null;
- key = null;
- callbackArg = null;
-
- // Retrieve the region name from the message parts
- regionNamePart = clientMessage.getPart(partNumber + 2);
- regionName = regionNamePart.getString();
- if (regionName.equals(PeerTypeRegistration.REGION_FULL_PATH)) {
- indexWithoutPDXEvent--;
- isPdxEvent = true;
- }
+ do {
+ if (isPdxEvent) {
+ // This is a retried event. Reset the PDX event index.
+ indexWithoutPDXEvent++;
+ }
+ isPdxEvent = false;
+ Part possibleDuplicatePart = clientMessage.getPart(partNumber + 1);
+ byte[] possibleDuplicatePartBytes;
+ try {
+ possibleDuplicatePartBytes = (byte[]) possibleDuplicatePart.getObject();
+ } catch (Exception e) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_REQUEST_1_CONTAINING_2_EVENTS,
+ new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
+ Integer.valueOf(numberOfEvents)}),
+ e);
+ handleException(removeOnException, stats, e);
+ break;
+ }
+ boolean possibleDuplicate = possibleDuplicatePartBytes[0] == 0x01;
+
+ // Make sure instance variables are null before each iteration
+ regionName = null;
+ key = null;
+ callbackArg = null;
+
+ // Retrieve the region name from the message parts
+ regionNamePart = clientMessage.getPart(partNumber + 2);
+ regionName = regionNamePart.getString();
+ if (regionName.equals(PeerTypeRegistration.REGION_FULL_PATH)) {
+ indexWithoutPDXEvent--;
+ isPdxEvent = true;
+ }
- // Retrieve the event id from the message parts
- // This was going to be used to determine possible
- // duplication of events, but it is unused now. In
- // fact the event id is overridden by the FROM_GATEWAY
- // token.
- Part eventIdPart = clientMessage.getPart(partNumber + 3);
- eventIdPart.setVersion(serverConnection.getClientVersion());
- // String eventId = eventIdPart.getString();
- try {
- eventId = (EventID) eventIdPart.getObject();
- } catch (Exception e) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_REQUEST_1_CONTAINING_2_EVENTS,
- new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
- Integer.valueOf(numberOfEvents)}),
- e);
- throw e;
- }
+ // Retrieve the event id from the message parts
+ // This was going to be used to determine possible
+ // duplication of events, but it is unused now. In
+ // fact the event id is overridden by the FROM_GATEWAY
+ // token.
+ Part eventIdPart = clientMessage.getPart(partNumber + 3);
+ eventIdPart.setVersion(serverConnection.getClientVersion());
+ // String eventId = eventIdPart.getString();
+ try {
+ eventId = (EventID) eventIdPart.getObject();
+ } catch (Exception e) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_REQUEST_1_CONTAINING_2_EVENTS,
+ new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
+ Integer.valueOf(numberOfEvents)}),
+ e);
+ handleException(removeOnException, stats, e);
+ break;
+ }
- // Retrieve the key from the message parts
- keyPart = clientMessage.getPart(partNumber + 4);
- try {
- key = keyPart.getStringOrObject();
- } catch (Exception e) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_REQUEST_1_CONTAINING_2_EVENTS,
- new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
- Integer.valueOf(numberOfEvents)}),
- e);
- throw e;
- }
- switch (actionType) {
- case 0: // Create
-
- /*
- * CLIENT EXCEPTION HANDLING TESTING CODE String keySt = (String) key;
- * System.out.println("Processing new key: " + key); if (keySt.startsWith("failure")) {
- * throw new Exception(LocalizedStrings
- * .ProcessBatch_THIS_EXCEPTION_REPRESENTS_A_FAILURE_ON_THE_SERVER
- * .toLocalizedString()); }
- */
-
- // Retrieve the value from the message parts (do not deserialize it)
- valuePart = clientMessage.getPart(partNumber + 5);
- // try {
- // logger.warn(getName() + ": Creating key " + key + " value " +
- // valuePart.getObject());
- // } catch (Exception e) {}
-
- // Retrieve the callbackArg from the message parts if necessary
- int index = partNumber + 6;
- callbackArgExistsPart = clientMessage.getPart(index++); {
- byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
- callbackArgExists = partBytes[0] == 0x01;
+ // Retrieve the key from the message parts
+ keyPart = clientMessage.getPart(partNumber + 4);
+ try {
+ key = keyPart.getStringOrObject();
+ } catch (Exception e) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_REQUEST_1_CONTAINING_2_EVENTS,
+ new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
+ Integer.valueOf(numberOfEvents)}),
+ e);
+ handleException(removeOnException, stats, e);
+ break;
}
- if (callbackArgExists) {
- callbackArgPart = clientMessage.getPart(index++);
- try {
- callbackArg = callbackArgPart.getObject();
- } catch (Exception e) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_CREATE_REQUEST_1_FOR_2_EVENTS,
- new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
- Integer.valueOf(numberOfEvents)}),
- e);
- throw e;
- }
- }
- if (logger.isDebugEnabled()) {
- logger.debug(
- "{}: Processing batch create request {} on {} for region {} key {} value {} callbackArg {}, eventId={}",
- serverConnection.getName(), batchId, serverConnection.getSocketString(),
- regionName, key, valuePart, callbackArg, eventId);
- }
- versionTimeStamp = clientMessage.getPart(index++).getLong();
- // Process the create request
- if (key == null || regionName == null) {
- StringId message = null;
- Object[] messageArgs =
- new Object[] {serverConnection.getName(), Integer.valueOf(batchId)};
- if (key == null) {
- message =
- LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_CREATE_REQUEST_1_IS_NULL;
- }
- if (regionName == null) {
- message =
- LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_CREATE_REQUEST_1_IS_NULL;
- }
- String s = message.toLocalizedString(messageArgs);
- logger.warn(s);
- throw new Exception(s);
- }
- do {
+ int index = -1;
+ switch (actionType) {
+ case 0: // Create
try {
+
+ /*
+ * CLIENT EXCEPTION HANDLING TESTING CODE String keySt = (String) key;
+ * System.out.println("Processing new key: " + key); if
+ * (keySt.startsWith("failure")) { throw new Exception(LocalizedStrings
+ * .ProcessBatch_THIS_EXCEPTION_REPRESENTS_A_FAILURE_ON_THE_SERVER
+ * .toLocalizedString()); }
+ */
+
+ // Retrieve the value from the message parts (do not deserialize it)
+ valuePart = clientMessage.getPart(partNumber + 5);
+ // try {
+ // logger.warn(getName() + ": Creating key " + key + " value " +
+ // valuePart.getObject());
+ // } catch (Exception e) {}
+
+ // Retrieve the callbackArg from the message parts if necessary
+ index = partNumber + 6;
+ callbackArgExistsPart = clientMessage.getPart(index++);
+ {
+ byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
+ callbackArgExists = partBytes[0] == 0x01;
+ }
+ if (callbackArgExists) {
+ callbackArgPart = clientMessage.getPart(index++);
+ try {
+ callbackArg = callbackArgPart.getObject();
+ } catch (Exception e) {
+ logger
+ .warn(
+ LocalizedMessage
+ .create(
+ LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_CREATE_REQUEST_1_FOR_2_EVENTS,
+ new Object[] {serverConnection.getName(),
+ Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}),
+ e);
+ throw e;
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "{}: Processing batch create request {} on {} for region {} key {} value {} callbackArg {}, eventId={}",
+ serverConnection.getName(), batchId, serverConnection.getSocketString(),
+ regionName, key, valuePart, callbackArg, eventId);
+ }
+ versionTimeStamp = clientMessage.getPart(index++).getLong();
+ // Process the create request
+ if (key == null || regionName == null) {
+ StringId message = null;
+ Object[] messageArgs =
+ new Object[] {serverConnection.getName(), Integer.valueOf(batchId)};
+ if (key == null) {
+ message =
+ LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_CREATE_REQUEST_1_IS_NULL;
+ }
+ if (regionName == null) {
+ message =
+ LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_CREATE_REQUEST_1_IS_NULL;
+ }
+ String s = message.toLocalizedString(messageArgs);
+ logger.warn(s);
+ throw new Exception(s);
+ }
region = (LocalRegion) crHelper.getRegion(regionName);
if (region == null) {
handleRegionNull(serverConnection, regionName, batchId);
@@ -365,71 +378,73 @@ public class GatewayReceiverCommand extends BaseCommand {
new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
Integer.valueOf(numberOfEvents)}),
e);
- handleException(removeOnException, e);
+ handleException(removeOnException, stats, e);
}
- } while (retry);
- break;
- case 1: // Update
- /*
- * CLIENT EXCEPTION HANDLING TESTING CODE keySt = (String) key;
- * System.out.println("Processing updated key: " + key); if
- * (keySt.startsWith("failure")) { throw new Exception(LocalizedStrings
- * .ProcessBatch_THIS_EXCEPTION_REPRESENTS_A_FAILURE_ON_THE_SERVER
- * .toLocalizedString()); }
- */
-
- // Retrieve the value from the message parts (do not deserialize it)
- valuePart = clientMessage.getPart(partNumber + 5);
- // try {
- // logger.warn(getName() + ": Updating key " + key + " value " +
- // valuePart.getObject());
- // } catch (Exception e) {}
-
- // Retrieve the callbackArg from the message parts if necessary
- index = partNumber + 6;
- callbackArgExistsPart = clientMessage.getPart(index++); {
- byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
- callbackArgExists = partBytes[0] == 0x01;
- }
- if (callbackArgExists) {
- callbackArgPart = clientMessage.getPart(index++);
- try {
- callbackArg = callbackArgPart.getObject();
- } catch (Exception e) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_REQUEST_1_CONTAINING_2_EVENTS,
- new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
- Integer.valueOf(numberOfEvents)}),
- e);
- throw e;
- }
- }
- versionTimeStamp = clientMessage.getPart(index++).getLong();
- if (logger.isDebugEnabled()) {
- logger.debug(
- "{}: Processing batch update request {} on {} for region {} key {} value {} callbackArg {}",
- serverConnection.getName(), batchId, serverConnection.getSocketString(),
- regionName, key, valuePart, callbackArg);
- }
- // Process the update request
- if (key == null || regionName == null) {
- StringId message = null;
- Object[] messageArgs =
- new Object[] {serverConnection.getName(), Integer.valueOf(batchId)};
- if (key == null) {
- message =
- LocalizedStrings.ProcessBatch_0_THE_INPUT_KEY_FOR_THE_BATCH_UPDATE_REQUEST_1_IS_NULL;
- }
- if (regionName == null) {
- message =
- LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_UPDATE_REQUEST_1_IS_NULL;
- }
- String s = message.toLocalizedString(messageArgs);
- logger.warn(s);
- throw new Exception(s);
- }
- do {
+ break;
+ case 1: // Update
try {
+ /*
+ * CLIENT EXCEPTION HANDLING TESTING CODE keySt = (String) key;
+ * System.out.println("Processing updated key: " + key); if
+ * (keySt.startsWith("failure")) { throw new Exception(LocalizedStrings
+ * .ProcessBatch_THIS_EXCEPTION_REPRESENTS_A_FAILURE_ON_THE_SERVER
+ * .toLocalizedString()); }
+ */
+
+ // Retrieve the value from the message parts (do not deserialize it)
+ valuePart = clientMessage.getPart(partNumber + 5);
+ // try {
+ // logger.warn(getName() + ": Updating key " + key + " value " +
+ // valuePart.getObject());
+ // } catch (Exception e) {}
+
+ // Retrieve the callbackArg from the message parts if necessary
+ index = partNumber + 6;
+ callbackArgExistsPart = clientMessage.getPart(index++);
+ {
+ byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
+ callbackArgExists = partBytes[0] == 0x01;
+ }
+ if (callbackArgExists) {
+ callbackArgPart = clientMessage.getPart(index++);
+ try {
+ callbackArg = callbackArgPart.getObject();
+ } catch (Exception e) {
+ logger
+ .warn(
+ LocalizedMessage
+ .create(
+ LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_REQUEST_1_CONTAINING_2_EVENTS,
+ new Object[] {serverConnection.getName(),
+ Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}),
+ e);
+ throw e;
+ }
+ }
+ versionTimeStamp = clientMessage.getPart(index++).getLong();
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "{}: Processing batch update request {} on {} for region {} key {} value {} callbackArg {}",
+ serverConnection.getName(), batchId, serverConnection.getSocketString(),
+ regionName, key, valuePart, callbackArg);
+ }
+ // Process the update request
+ if (key == null || regionName == null) {
+ StringId message = null;
+ Object[] messageArgs =
+ new Object[] {serverConnection.getName(), Integer.valueOf(batchId)};
+ if (key == null) {
+ message =
+ LocalizedStrings.ProcessBatch_0_THE_INPUT_KEY_FOR_THE_BATCH_UPDATE_REQUEST_1_IS_NULL;
+ }
+ if (regionName == null) {
+ message =
+ LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_UPDATE_REQUEST_1_IS_NULL;
+ }
+ String s = message.toLocalizedString(messageArgs);
+ logger.warn(s);
+ throw new Exception(s);
+ }
region = (LocalRegion) crHelper.getRegion(regionName);
if (region == null) {
handleRegionNull(serverConnection, regionName, batchId);
@@ -481,57 +496,59 @@ public class GatewayReceiverCommand extends BaseCommand {
new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
Integer.valueOf(numberOfEvents)}),
e);
- handleException(removeOnException, e);
- }
- } while (retry);
- break;
- case 2: // Destroy
- // Retrieve the callbackArg from the message parts if necessary
- index = partNumber + 5;
- callbackArgExistsPart = clientMessage.getPart(index++); {
- byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
- callbackArgExists = partBytes[0] == 0x01;
- }
- if (callbackArgExists) {
- callbackArgPart = clientMessage.getPart(index++);
- try {
- callbackArg = callbackArgPart.getObject();
- } catch (Exception e) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_DESTROY_REQUEST_1_CONTAINING_2_EVENTS,
- new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
- Integer.valueOf(numberOfEvents)}),
- e);
- throw e;
- }
- }
-
- versionTimeStamp = clientMessage.getPart(index++).getLong();
- if (logger.isDebugEnabled()) {
- logger.debug("{}: Processing batch destroy request {} on {} for region {} key {}",
- serverConnection.getName(), batchId, serverConnection.getSocketString(),
- regionName, key);
- }
-
- // Process the destroy request
- if (key == null || regionName == null) {
- StringId message = null;
- if (key == null) {
- message =
- LocalizedStrings.ProcessBatch_0_THE_INPUT_KEY_FOR_THE_BATCH_DESTROY_REQUEST_1_IS_NULL;
+ handleException(removeOnException, stats, e);
}
- if (regionName == null) {
- message =
- LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_DESTROY_REQUEST_1_IS_NULL;
- }
- Object[] messageArgs =
- new Object[] {serverConnection.getName(), Integer.valueOf(batchId)};
- String s = message.toLocalizedString(messageArgs);
- logger.warn(s);
- throw new Exception(s);
- }
- do {
+ break;
+ case 2: // Destroy
try {
+ // Retrieve the callbackArg from the message parts if necessary
+ index = partNumber + 5;
+ callbackArgExistsPart = clientMessage.getPart(index++);
+ {
+ byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
+ callbackArgExists = partBytes[0] == 0x01;
+ }
+ if (callbackArgExists) {
+ callbackArgPart = clientMessage.getPart(index++);
+ try {
+ callbackArg = callbackArgPart.getObject();
+ } catch (Exception e) {
+ logger
+ .warn(
+ LocalizedMessage
+ .create(
+ LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_DESTROY_REQUEST_1_CONTAINING_2_EVENTS,
+ new Object[] {serverConnection.getName(),
+ Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}),
+ e);
+ throw e;
+ }
+ }
+
+ versionTimeStamp = clientMessage.getPart(index++).getLong();
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Processing batch destroy request {} on {} for region {} key {}",
+ serverConnection.getName(), batchId, serverConnection.getSocketString(),
+ regionName, key);
+ }
+
+ // Process the destroy request
+ if (key == null || regionName == null) {
+ StringId message = null;
+ if (key == null) {
+ message =
+ LocalizedStrings.ProcessBatch_0_THE_INPUT_KEY_FOR_THE_BATCH_DESTROY_REQUEST_1_IS_NULL;
+ }
+ if (regionName == null) {
+ message =
+ LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_DESTROY_REQUEST_1_IS_NULL;
+ }
+ Object[] messageArgs =
+ new Object[] {serverConnection.getName(), Integer.valueOf(batchId)};
+ String s = message.toLocalizedString(messageArgs);
+ logger.warn(s);
+ throw new Exception(s);
+ }
region = (LocalRegion) crHelper.getRegion(regionName);
if (region == null) {
handleRegionNull(serverConnection, regionName, batchId);
@@ -552,121 +569,120 @@ public class GatewayReceiverCommand extends BaseCommand {
authzRequest.destroyAuthorize(regionName, key, callbackArg);
callbackArg = destroyContext.getCallbackArg();
}
- region.basicBridgeDestroy(key, callbackArg, serverConnection.getProxyID(), false,
- clientEvent);
- serverConnection.setModificationInfo(true, regionName, key);
+ try {
+ region.basicBridgeDestroy(key, callbackArg, serverConnection.getProxyID(),
+ false, clientEvent);
+ serverConnection.setModificationInfo(true, regionName, key);
+ } catch (EntryNotFoundException e) {
+ logger.info(LocalizedMessage.create(
+ LocalizedStrings.ProcessBatch_0_DURING_BATCH_DESTROY_NO_ENTRY_WAS_FOUND_FOR_KEY_1,
+ new Object[] {serverConnection.getName(), key}));
+ // throw new Exception(e);
+ }
stats.incDestroyRequest();
retry = false;
}
- } catch (EntryNotFoundException e) {
- logger.info(LocalizedMessage.create(
- LocalizedStrings.ProcessBatch_0_DURING_BATCH_DESTROY_NO_ENTRY_WAS_FOUND_FOR_KEY_1,
- new Object[] {serverConnection.getName(), key}));
- // throw new Exception(e);
} catch (Exception e) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_DESTROY_REQUEST_1_CONTAINING_2_EVENTS,
new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
Integer.valueOf(numberOfEvents)}),
e);
- handleException(removeOnException, e);
+ handleException(removeOnException, stats, e);
}
- } while (retry);
- break;
- case 3: // Update Time-stamp for a RegionEntry
+ break;
+ case 3: // Update Time-stamp for a RegionEntry
- try {
- // Region name
- regionNamePart = clientMessage.getPart(partNumber + 2);
- regionName = regionNamePart.getString();
+ try {
+ // Region name
+ regionNamePart = clientMessage.getPart(partNumber + 2);
+ regionName = regionNamePart.getString();
- // Retrieve the event id from the message parts
- eventIdPart = clientMessage.getPart(partNumber + 3);
- eventId = (EventID) eventIdPart.getObject();
+ // Retrieve the event id from the message parts
+ eventIdPart = clientMessage.getPart(partNumber + 3);
+ eventId = (EventID) eventIdPart.getObject();
- // Retrieve the key from the message parts
- keyPart = clientMessage.getPart(partNumber + 4);
- key = keyPart.getStringOrObject();
+ // Retrieve the key from the message parts
+ keyPart = clientMessage.getPart(partNumber + 4);
+ key = keyPart.getStringOrObject();
- // Retrieve the callbackArg from the message parts if necessary
- index = partNumber + 5;
- callbackArgExistsPart = clientMessage.getPart(index++);
+ // Retrieve the callbackArg from the message parts if necessary
+ index = partNumber + 5;
+ callbackArgExistsPart = clientMessage.getPart(index++);
- byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
- callbackArgExists = partBytes[0] == 0x01;
+ byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
+ callbackArgExists = partBytes[0] == 0x01;
- if (callbackArgExists) {
- callbackArgPart = clientMessage.getPart(index++);
- callbackArg = callbackArgPart.getObject();
- }
+ if (callbackArgExists) {
+ callbackArgPart = clientMessage.getPart(index++);
+ callbackArg = callbackArgPart.getObject();
+ }
- } catch (Exception e) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_VERSION_REQUEST_1_CONTAINING_2_EVENTS,
- new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
- Integer.valueOf(numberOfEvents)}),
- e);
- throw e;
- }
-
- versionTimeStamp = clientMessage.getPart(index++).getLong();
- if (logger.isDebugEnabled()) {
- logger.debug(
- "{}: Processing batch update-version request {} on {} for region {} key {} value {} callbackArg {}",
- serverConnection.getName(), batchId, serverConnection.getSocketString(),
- regionName, key, valuePart, callbackArg);
- }
- // Process the update time-stamp request
- if (key == null || regionName == null) {
- StringId message =
- LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_VERSION_REQUEST_1_CONTAINING_2_EVENTS;
-
- Object[] messageArgs = new Object[] {serverConnection.getName(),
- Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)};
- String s = message.toLocalizedString(messageArgs);
- logger.warn(s);
- throw new Exception(s);
-
- } else {
- region = (LocalRegion) crHelper.getRegion(regionName);
-
- if (region == null) {
- handleRegionNull(serverConnection, regionName, batchId);
- } else {
-
- clientEvent = new EventIDHolder(eventId);
-
- if (versionTimeStamp > 0) {
- VersionTag tag = VersionTag.create(region.getVersionMember());
- tag.setIsGatewayTag(true);
- tag.setVersionTimeStamp(versionTimeStamp);
- tag.setDistributedSystemId(dsid);
- clientEvent.setVersionTag(tag);
+ versionTimeStamp = clientMessage.getPart(index++).getLong();
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "{}: Processing batch update-version request {} on {} for region {} key {} value {} callbackArg {}",
+ serverConnection.getName(), batchId, serverConnection.getSocketString(),
+ regionName, key, valuePart, callbackArg);
}
+ // Process the update time-stamp request
+ if (key == null || regionName == null) {
+ StringId message =
+ LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_VERSION_REQUEST_1_CONTAINING_2_EVENTS;
+
+ Object[] messageArgs = new Object[] {serverConnection.getName(),
+ Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)};
+ String s = message.toLocalizedString(messageArgs);
+ logger.warn(s);
+ throw new Exception(s);
+
+ } else {
+ region = (LocalRegion) crHelper.getRegion(regionName);
+
+ if (region == null) {
+ handleRegionNull(serverConnection, regionName, batchId);
+ } else {
- // Update the version tag
- try {
+ clientEvent = new EventIDHolder(eventId);
- region.basicBridgeUpdateVersionStamp(key, callbackArg,
- serverConnection.getProxyID(), false, clientEvent);
+ if (versionTimeStamp > 0) {
+ VersionTag tag = VersionTag.create(region.getVersionMember());
+ tag.setIsGatewayTag(true);
+ tag.setVersionTimeStamp(versionTimeStamp);
+ tag.setDistributedSystemId(dsid);
+ clientEvent.setVersionTag(tag);
+ }
- } catch (EntryNotFoundException e) {
- logger.info(LocalizedMessage.create(
- LocalizedStrings.ProcessBatch_0_DURING_BATCH_UPDATE_VERSION_NO_ENTRY_WAS_FOUND_FOR_KEY_1,
- new Object[] {serverConnection.getName(), key}));
- // throw new Exception(e);
+ // Update the version tag
+ try {
+ region.basicBridgeUpdateVersionStamp(key, callbackArg,
+ serverConnection.getProxyID(), false, clientEvent);
+ } catch (EntryNotFoundException e) {
+ logger.info(LocalizedMessage.create(
+ LocalizedStrings.ProcessBatch_0_DURING_BATCH_UPDATE_VERSION_NO_ENTRY_WAS_FOUND_FOR_KEY_1,
+ new Object[] {serverConnection.getName(), key}));
+ }
+ retry = false;
+ }
}
+ } catch (Exception e) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_VERSION_REQUEST_1_CONTAINING_2_EVENTS,
+ new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
+ Integer.valueOf(numberOfEvents)}),
+ e);
+ handleException(removeOnException, stats, e);
}
- }
- break;
- default:
- logger.fatal(LocalizedMessage.create(
- LocalizedStrings.Processbatch_0_UNKNOWN_ACTION_TYPE_1_FOR_BATCH_FROM_2,
- new Object[] {serverConnection.getName(), Integer.valueOf(actionType),
- serverConnection.getSocketString()}));
- stats.incUnknowsOperationsReceived();
- }
+ break;
+ default:
+ logger.fatal(LocalizedMessage.create(
+ LocalizedStrings.Processbatch_0_UNKNOWN_ACTION_TYPE_1_FOR_BATCH_FROM_2,
+ new Object[] {serverConnection.getName(), Integer.valueOf(actionType),
+ serverConnection.getSocketString()}));
+ stats.incUnknowsOperationsReceived();
+ }
+ } while (retry);
} catch (CancelException e) {
if (logger.isDebugEnabled()) {
logger.debug(
@@ -766,10 +782,12 @@ public class GatewayReceiverCommand extends BaseCommand {
return true;
}
- private void handleException(boolean removeOnException, Exception e) throws Exception {
+ private void handleException(boolean removeOnException, GatewayReceiverStats stats, Exception e)
+ throws Exception {
if (shouldThrowException(removeOnException, e)) {
throw e;
} else {
+ stats.incEventsRetried();
Thread.sleep(500);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java
index 0561b4f..6935dc6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java
@@ -56,6 +56,9 @@ public class GatewayReceiverStats extends CacheServerStats {
/** Name of the unprocessed events added by primary statistic */
private static final String EXCEPTIONS_OCCURRED = "exceptionsOccurred";
+ /** Name of the events retried */
+ private static final String EVENTS_RETRIED = "eventsRetried";
+
// /** Id of the events queued statistic */
// private int failoverBatchesReceivedId;
@@ -86,6 +89,9 @@ public class GatewayReceiverStats extends CacheServerStats {
/** Id of the unprocessed events added by primary statistic */
private int exceptionsOccurredId;
+ /** Id of the events retried statistic */
+ private int eventsRetriedId;
+
// ///////////////////// Constructors ///////////////////////
public static GatewayReceiverStats createGatewayReceiverStats(String ownerName) {
@@ -110,7 +116,9 @@ public class GatewayReceiverStats extends CacheServerStats {
f.createIntCounter(UNKNOWN_OPERATIONS_RECEIVED,
"total number of unknown operations received by this GatewayReceiver", "operations"),
f.createIntCounter(EXCEPTIONS_OCCURRED,
- "number of exceptions occurred while porcessing the batches", "operations")};
+ "number of exceptions occurred while porcessing the batches", "operations"),
+ f.createIntCounter(EVENTS_RETRIED,
+ "total number events retried by this GatewayReceiver due to exceptions", "operations")};
return new GatewayReceiverStats(f, ownerName, typeName, descriptors);
}
@@ -129,6 +137,7 @@ public class GatewayReceiverStats extends CacheServerStats {
destroyRequestId = statType.nameToId(DESTROY_REQUESTS);
unknowsOperationsReceivedId = statType.nameToId(UNKNOWN_OPERATIONS_RECEIVED);
exceptionsOccurredId = statType.nameToId(EXCEPTIONS_OCCURRED);
+ eventsRetriedId = statType.nameToId(EVENTS_RETRIED);
}
// /////////////////// Instance Methods /////////////////////
@@ -244,6 +253,17 @@ public class GatewayReceiverStats extends CacheServerStats {
}
/**
+ * Increments the number of events received by 1.
+ */
+ public void incEventsRetried() {
+ this.stats.incInt(eventsRetriedId, 1);
+ }
+
+ public int getEventsRetried() {
+ return this.stats.getInt(eventsRetriedId);
+ }
+
+ /**
* Returns the current time (ns).
*
* @return the current time (ns)
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/KeepEventsOnGatewaySenderQueueDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/KeepEventsOnGatewaySenderQueueDUnitTest.java
index 6c89f0f..0f49a88 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/KeepEventsOnGatewaySenderQueueDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/KeepEventsOnGatewaySenderQueueDUnitTest.java
@@ -14,15 +14,23 @@
*/
package org.apache.geode.internal.cache.wan.misc;
+import static org.assertj.core.api.Assertions.*;
+
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import org.awaitility.Awaitility;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.GatewayReceiverStats;
import org.apache.geode.internal.cache.wan.WANTestBase;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.junit.categories.DistributedTest;
@@ -35,10 +43,10 @@ public class KeepEventsOnGatewaySenderQueueDUnitTest extends WANTestBase {
}
@Test
- public void testBasicKeepEventsOnGatewaySenderQueue() throws Exception {
+ public void testKeepEventsOnGatewaySenderQueueWithPartitionOfflineException() throws Exception {
// Start locators
- Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = vm2.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+ Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+ Integer nyPort = vm2.invoke(() -> createFirstRemoteLocator(2, lnPort));
String regionName = getTestMethodName() + "_PR";
String senderId = "ny";
@@ -51,7 +59,7 @@ public class KeepEventsOnGatewaySenderQueueDUnitTest extends WANTestBase {
vm4.invoke(() -> assignBuckets(regionName));
// Configure sending site members
- createCacheInVMs(lnPort, vm1);
+ vm1.invoke(() -> createCache(lnPort));
vm1.invoke(() -> createSender(senderId, 2, true, 100, 10, false, true, null, false));
vm1.invoke(() -> disableRemoveFromQueueOnException(senderId));
vm1.invoke(() -> createPartitionedRegionWithPersistence(regionName, senderId, 0, 100));
@@ -63,7 +71,7 @@ public class KeepEventsOnGatewaySenderQueueDUnitTest extends WANTestBase {
AsyncInvocation<Integer> closeOpenInvocation =
vm3.invokeAsync(() -> closeRecreateCache(nyPort, regionName, 3));
- // Once puts are complete, wait for queue to be empty
+ // Once puts are complete, wait for sending site member queue to be empty
int numPuts = putInvocation.get(120, TimeUnit.SECONDS);
vm1.invoke(() -> validateQueueSizeStat(senderId, 0));
@@ -74,6 +82,43 @@ public class KeepEventsOnGatewaySenderQueueDUnitTest extends WANTestBase {
vm4.invoke(() -> validateRegionSize(regionName, numPuts));
}
+ @Test
+ public void testKeepEventsOnGatewaySenderQueueWithRegionDestroyedException() throws Exception {
+ // Start locators
+ Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+ Integer nyPort = vm2.invoke(() -> createFirstRemoteLocator(2, lnPort));
+
+ String regionName = getTestMethodName() + "_PR";
+ String senderId = "ny";
+
+ // Configure receiving site member
+ vm3.invoke(() -> createCache(nyPort));
+ vm3.invoke(() -> createReceiver());
+
+ // Configure sending site member
+ vm1.invoke(() -> createCache(lnPort));
+ vm1.invoke(() -> createSender(senderId, 2, true, 100, 10, false, true, null, false));
+ vm1.invoke(() -> disableRemoveFromQueueOnException(senderId));
+ vm1.invoke(() -> createPartitionedRegionWithPersistence(regionName, senderId, 0, 100));
+
+ // Do puts in sending site member
+ int numPuts = 10;
+ vm1.invoke(() -> doPuts(regionName, numPuts));
+
+ // Wait for some retries to occur
+ vm3.invoke(() -> waitForEventRetries(10));
+
+ // Create region in receiving site member
+ vm3.invoke(() -> createPartitionedRegionWithPersistence(regionName, null, 0, 100));
+
+ // Wait for sending site member queue to be empty
+ vm1.invoke(() -> validateQueueSizeStat(senderId, 0));
+
+ // Verify region sizes in both sites
+ vm1.invoke(() -> validateRegionSize(regionName, numPuts));
+ vm3.invoke(() -> validateRegionSize(regionName, numPuts));
+ }
+
private void disableRemoveFromQueueOnException(String senderId) {
AbstractGatewaySender ags = (AbstractGatewaySender) cache.getGatewaySender(senderId);
ags.setRemoveFromQueueOnException(false);
@@ -106,4 +151,18 @@ public class KeepEventsOnGatewaySenderQueueDUnitTest extends WANTestBase {
createPartitionedRegionWithPersistence(regionName, null, 0, 100);
}
}
+
+ private void waitForEventRetries(int numRetries) {
+ GatewayReceiverStats stats = getGatewayReceiverStats();
+ Awaitility.await().atMost(60, TimeUnit.SECONDS)
+ .until(() -> stats.getEventsRetried() > numRetries);
+ }
+
+ private GatewayReceiverStats getGatewayReceiverStats() {
+ Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
+ GatewayReceiver receiver = gatewayReceivers.iterator().next();
+ CacheServerStats stats = ((CacheServerImpl) receiver.getServer()).getAcceptor().getStats();
+ assertThat(stats).isInstanceOf(GatewayReceiverStats.class);
+ return (GatewayReceiverStats) stats;
+ }
}
--
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].