You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2017/10/04 18:10:47 UTC

[geode] branch feature/GEODE-3730 updated: GEODE-3730: Moved retry loop around switch statement

This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a commit to branch feature/GEODE-3730
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-3730 by this push:
     new 6e00da4  GEODE-3730: Moved retry loop around switch statement
6e00da4 is described below

commit 6e00da40cf4868a62e5964b98df500bfc57f9521
Author: Barry Oglesby <bo...@pivotal.io>
AuthorDate: Tue Oct 3 09:19:55 2017 -0700

    GEODE-3730: Moved retry loop around switch statement
---
 .../sockets/command/GatewayReceiverCommand.java    | 662 +++++++++++----------
 .../internal/cache/wan/GatewayReceiverStats.java   |  22 +-
 .../KeepEventsOnGatewaySenderQueueDUnitTest.java   |  69 ++-
 3 files changed, 425 insertions(+), 328 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
index 0b928f2..33043b2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
@@ -169,7 +169,7 @@ public class GatewayReceiverCommand extends BaseCommand {
 
     // event received in batch also have PDX events at the start of the batch,to
     // represent correct index on which the exception occurred, number of PDX
-    // events need to be subtratced.
+    // events need to be subtracted.
     int indexWithoutPDXEvent = -1; //
     for (int i = 0; i < numberOfEvents; i++) {
       boolean retry = true;
@@ -186,127 +186,140 @@ public class GatewayReceiverCommand extends BaseCommand {
       boolean callbackArgExists = false;
 
       try {
-        Part possibleDuplicatePart = clientMessage.getPart(partNumber + 1);
-        byte[] possibleDuplicatePartBytes;
-        try {
-          possibleDuplicatePartBytes = (byte[]) possibleDuplicatePart.getObject();
-        } catch (Exception e) {
-          logger.warn(LocalizedMessage.create(
-              LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_REQUEST_1_CONTAINING_2_EVENTS,
-              new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
-                  Integer.valueOf(numberOfEvents)}),
-              e);
-          throw e;
-        }
-        boolean possibleDuplicate = possibleDuplicatePartBytes[0] == 0x01;
-
-        // Make sure instance variables are null before each iteration
-        regionName = null;
-        key = null;
-        callbackArg = null;
-
-        // Retrieve the region name from the message parts
-        regionNamePart = clientMessage.getPart(partNumber + 2);
-        regionName = regionNamePart.getString();
-        if (regionName.equals(PeerTypeRegistration.REGION_FULL_PATH)) {
-          indexWithoutPDXEvent--;
-          isPdxEvent = true;
-        }
+        do {
+          if (isPdxEvent) {
+            // This is a retried event. Reset the PDX event index.
+            indexWithoutPDXEvent++;
+          }
+          isPdxEvent = false;
+          Part possibleDuplicatePart = clientMessage.getPart(partNumber + 1);
+          byte[] possibleDuplicatePartBytes;
+          try {
+            possibleDuplicatePartBytes = (byte[]) possibleDuplicatePart.getObject();
+          } catch (Exception e) {
+            logger.warn(LocalizedMessage.create(
+                LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_REQUEST_1_CONTAINING_2_EVENTS,
+                new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
+                    Integer.valueOf(numberOfEvents)}),
+                e);
+            handleException(removeOnException, stats, e);
+            break;
+          }
+          boolean possibleDuplicate = possibleDuplicatePartBytes[0] == 0x01;
+
+          // Make sure instance variables are null before each iteration
+          regionName = null;
+          key = null;
+          callbackArg = null;
+
+          // Retrieve the region name from the message parts
+          regionNamePart = clientMessage.getPart(partNumber + 2);
+          regionName = regionNamePart.getString();
+          if (regionName.equals(PeerTypeRegistration.REGION_FULL_PATH)) {
+            indexWithoutPDXEvent--;
+            isPdxEvent = true;
+          }
 
-        // Retrieve the event id from the message parts
-        // This was going to be used to determine possible
-        // duplication of events, but it is unused now. In
-        // fact the event id is overridden by the FROM_GATEWAY
-        // token.
-        Part eventIdPart = clientMessage.getPart(partNumber + 3);
-        eventIdPart.setVersion(serverConnection.getClientVersion());
-        // String eventId = eventIdPart.getString();
-        try {
-          eventId = (EventID) eventIdPart.getObject();
-        } catch (Exception e) {
-          logger.warn(LocalizedMessage.create(
-              LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_REQUEST_1_CONTAINING_2_EVENTS,
-              new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
-                  Integer.valueOf(numberOfEvents)}),
-              e);
-          throw e;
-        }
+          // Retrieve the event id from the message parts
+          // This was going to be used to determine possible
+          // duplication of events, but it is unused now. In
+          // fact the event id is overridden by the FROM_GATEWAY
+          // token.
+          Part eventIdPart = clientMessage.getPart(partNumber + 3);
+          eventIdPart.setVersion(serverConnection.getClientVersion());
+          // String eventId = eventIdPart.getString();
+          try {
+            eventId = (EventID) eventIdPart.getObject();
+          } catch (Exception e) {
+            logger.warn(LocalizedMessage.create(
+                LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_REQUEST_1_CONTAINING_2_EVENTS,
+                new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
+                    Integer.valueOf(numberOfEvents)}),
+                e);
+            handleException(removeOnException, stats, e);
+            break;
+          }
 
-        // Retrieve the key from the message parts
-        keyPart = clientMessage.getPart(partNumber + 4);
-        try {
-          key = keyPart.getStringOrObject();
-        } catch (Exception e) {
-          logger.warn(LocalizedMessage.create(
-              LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_REQUEST_1_CONTAINING_2_EVENTS,
-              new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
-                  Integer.valueOf(numberOfEvents)}),
-              e);
-          throw e;
-        }
-        switch (actionType) {
-          case 0: // Create
-
-            /*
-             * CLIENT EXCEPTION HANDLING TESTING CODE String keySt = (String) key;
-             * System.out.println("Processing new key: " + key); if (keySt.startsWith("failure")) {
-             * throw new Exception(LocalizedStrings
-             * .ProcessBatch_THIS_EXCEPTION_REPRESENTS_A_FAILURE_ON_THE_SERVER
-             * .toLocalizedString()); }
-             */
-
-            // Retrieve the value from the message parts (do not deserialize it)
-            valuePart = clientMessage.getPart(partNumber + 5);
-            // try {
-            // logger.warn(getName() + ": Creating key " + key + " value " +
-            // valuePart.getObject());
-            // } catch (Exception e) {}
-
-            // Retrieve the callbackArg from the message parts if necessary
-            int index = partNumber + 6;
-            callbackArgExistsPart = clientMessage.getPart(index++); {
-            byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
-            callbackArgExists = partBytes[0] == 0x01;
+          // Retrieve the key from the message parts
+          keyPart = clientMessage.getPart(partNumber + 4);
+          try {
+            key = keyPart.getStringOrObject();
+          } catch (Exception e) {
+            logger.warn(LocalizedMessage.create(
+                LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_REQUEST_1_CONTAINING_2_EVENTS,
+                new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
+                    Integer.valueOf(numberOfEvents)}),
+                e);
+            handleException(removeOnException, stats, e);
+            break;
           }
-            if (callbackArgExists) {
-              callbackArgPart = clientMessage.getPart(index++);
-              try {
-                callbackArg = callbackArgPart.getObject();
-              } catch (Exception e) {
-                logger.warn(LocalizedMessage.create(
-                    LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_CREATE_REQUEST_1_FOR_2_EVENTS,
-                    new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
-                        Integer.valueOf(numberOfEvents)}),
-                    e);
-                throw e;
-              }
-            }
-            if (logger.isDebugEnabled()) {
-              logger.debug(
-                  "{}: Processing batch create request {} on {} for region {} key {} value {} callbackArg {}, eventId={}",
-                  serverConnection.getName(), batchId, serverConnection.getSocketString(),
-                  regionName, key, valuePart, callbackArg, eventId);
-            }
-            versionTimeStamp = clientMessage.getPart(index++).getLong();
-            // Process the create request
-            if (key == null || regionName == null) {
-              StringId message = null;
-              Object[] messageArgs =
-                  new Object[] {serverConnection.getName(), Integer.valueOf(batchId)};
-              if (key == null) {
-                message =
-                    LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_CREATE_REQUEST_1_IS_NULL;
-              }
-              if (regionName == null) {
-                message =
-                    LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_CREATE_REQUEST_1_IS_NULL;
-              }
-              String s = message.toLocalizedString(messageArgs);
-              logger.warn(s);
-              throw new Exception(s);
-            }
-            do {
+          int index = -1;
+          switch (actionType) {
+            case 0: // Create
               try {
+
+                /*
+                 * CLIENT EXCEPTION HANDLING TESTING CODE String keySt = (String) key;
+                 * System.out.println("Processing new key: " + key); if
+                 * (keySt.startsWith("failure")) { throw new Exception(LocalizedStrings
+                 * .ProcessBatch_THIS_EXCEPTION_REPRESENTS_A_FAILURE_ON_THE_SERVER
+                 * .toLocalizedString()); }
+                 */
+
+                // Retrieve the value from the message parts (do not deserialize it)
+                valuePart = clientMessage.getPart(partNumber + 5);
+                // try {
+                // logger.warn(getName() + ": Creating key " + key + " value " +
+                // valuePart.getObject());
+                // } catch (Exception e) {}
+
+                // Retrieve the callbackArg from the message parts if necessary
+                index = partNumber + 6;
+                callbackArgExistsPart = clientMessage.getPart(index++);
+                {
+                  byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
+                  callbackArgExists = partBytes[0] == 0x01;
+                }
+                if (callbackArgExists) {
+                  callbackArgPart = clientMessage.getPart(index++);
+                  try {
+                    callbackArg = callbackArgPart.getObject();
+                  } catch (Exception e) {
+                    logger
+                        .warn(
+                            LocalizedMessage
+                                .create(
+                                    LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_CREATE_REQUEST_1_FOR_2_EVENTS,
+                                    new Object[] {serverConnection.getName(),
+                                        Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}),
+                            e);
+                    throw e;
+                  }
+                }
+                if (logger.isDebugEnabled()) {
+                  logger.debug(
+                      "{}: Processing batch create request {} on {} for region {} key {} value {} callbackArg {}, eventId={}",
+                      serverConnection.getName(), batchId, serverConnection.getSocketString(),
+                      regionName, key, valuePart, callbackArg, eventId);
+                }
+                versionTimeStamp = clientMessage.getPart(index++).getLong();
+                // Process the create request
+                if (key == null || regionName == null) {
+                  StringId message = null;
+                  Object[] messageArgs =
+                      new Object[] {serverConnection.getName(), Integer.valueOf(batchId)};
+                  if (key == null) {
+                    message =
+                        LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_CREATE_REQUEST_1_IS_NULL;
+                  }
+                  if (regionName == null) {
+                    message =
+                        LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_CREATE_REQUEST_1_IS_NULL;
+                  }
+                  String s = message.toLocalizedString(messageArgs);
+                  logger.warn(s);
+                  throw new Exception(s);
+                }
                 region = (LocalRegion) crHelper.getRegion(regionName);
                 if (region == null) {
                   handleRegionNull(serverConnection, regionName, batchId);
@@ -365,71 +378,73 @@ public class GatewayReceiverCommand extends BaseCommand {
                     new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
                         Integer.valueOf(numberOfEvents)}),
                     e);
-                handleException(removeOnException, e);
+                handleException(removeOnException, stats, e);
               }
-            } while (retry);
-            break;
-          case 1: // Update
-            /*
-             * CLIENT EXCEPTION HANDLING TESTING CODE keySt = (String) key;
-             * System.out.println("Processing updated key: " + key); if
-             * (keySt.startsWith("failure")) { throw new Exception(LocalizedStrings
-             * .ProcessBatch_THIS_EXCEPTION_REPRESENTS_A_FAILURE_ON_THE_SERVER
-             * .toLocalizedString()); }
-             */
-
-            // Retrieve the value from the message parts (do not deserialize it)
-            valuePart = clientMessage.getPart(partNumber + 5);
-            // try {
-            // logger.warn(getName() + ": Updating key " + key + " value " +
-            // valuePart.getObject());
-            // } catch (Exception e) {}
-
-            // Retrieve the callbackArg from the message parts if necessary
-            index = partNumber + 6;
-            callbackArgExistsPart = clientMessage.getPart(index++); {
-            byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
-            callbackArgExists = partBytes[0] == 0x01;
-          }
-            if (callbackArgExists) {
-              callbackArgPart = clientMessage.getPart(index++);
-              try {
-                callbackArg = callbackArgPart.getObject();
-              } catch (Exception e) {
-                logger.warn(LocalizedMessage.create(
-                    LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_REQUEST_1_CONTAINING_2_EVENTS,
-                    new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
-                        Integer.valueOf(numberOfEvents)}),
-                    e);
-                throw e;
-              }
-            }
-            versionTimeStamp = clientMessage.getPart(index++).getLong();
-            if (logger.isDebugEnabled()) {
-              logger.debug(
-                  "{}: Processing batch update request {} on {} for region {} key {} value {} callbackArg {}",
-                  serverConnection.getName(), batchId, serverConnection.getSocketString(),
-                  regionName, key, valuePart, callbackArg);
-            }
-            // Process the update request
-            if (key == null || regionName == null) {
-              StringId message = null;
-              Object[] messageArgs =
-                  new Object[] {serverConnection.getName(), Integer.valueOf(batchId)};
-              if (key == null) {
-                message =
-                    LocalizedStrings.ProcessBatch_0_THE_INPUT_KEY_FOR_THE_BATCH_UPDATE_REQUEST_1_IS_NULL;
-              }
-              if (regionName == null) {
-                message =
-                    LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_UPDATE_REQUEST_1_IS_NULL;
-              }
-              String s = message.toLocalizedString(messageArgs);
-              logger.warn(s);
-              throw new Exception(s);
-            }
-            do {
+              break;
+            case 1: // Update
               try {
+                /*
+                 * CLIENT EXCEPTION HANDLING TESTING CODE keySt = (String) key;
+                 * System.out.println("Processing updated key: " + key); if
+                 * (keySt.startsWith("failure")) { throw new Exception(LocalizedStrings
+                 * .ProcessBatch_THIS_EXCEPTION_REPRESENTS_A_FAILURE_ON_THE_SERVER
+                 * .toLocalizedString()); }
+                 */
+
+                // Retrieve the value from the message parts (do not deserialize it)
+                valuePart = clientMessage.getPart(partNumber + 5);
+                // try {
+                // logger.warn(getName() + ": Updating key " + key + " value " +
+                // valuePart.getObject());
+                // } catch (Exception e) {}
+
+                // Retrieve the callbackArg from the message parts if necessary
+                index = partNumber + 6;
+                callbackArgExistsPart = clientMessage.getPart(index++);
+                {
+                  byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
+                  callbackArgExists = partBytes[0] == 0x01;
+                }
+                if (callbackArgExists) {
+                  callbackArgPart = clientMessage.getPart(index++);
+                  try {
+                    callbackArg = callbackArgPart.getObject();
+                  } catch (Exception e) {
+                    logger
+                        .warn(
+                            LocalizedMessage
+                                .create(
+                                    LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_REQUEST_1_CONTAINING_2_EVENTS,
+                                    new Object[] {serverConnection.getName(),
+                                        Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}),
+                            e);
+                    throw e;
+                  }
+                }
+                versionTimeStamp = clientMessage.getPart(index++).getLong();
+                if (logger.isDebugEnabled()) {
+                  logger.debug(
+                      "{}: Processing batch update request {} on {} for region {} key {} value {} callbackArg {}",
+                      serverConnection.getName(), batchId, serverConnection.getSocketString(),
+                      regionName, key, valuePart, callbackArg);
+                }
+                // Process the update request
+                if (key == null || regionName == null) {
+                  StringId message = null;
+                  Object[] messageArgs =
+                      new Object[] {serverConnection.getName(), Integer.valueOf(batchId)};
+                  if (key == null) {
+                    message =
+                        LocalizedStrings.ProcessBatch_0_THE_INPUT_KEY_FOR_THE_BATCH_UPDATE_REQUEST_1_IS_NULL;
+                  }
+                  if (regionName == null) {
+                    message =
+                        LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_UPDATE_REQUEST_1_IS_NULL;
+                  }
+                  String s = message.toLocalizedString(messageArgs);
+                  logger.warn(s);
+                  throw new Exception(s);
+                }
                 region = (LocalRegion) crHelper.getRegion(regionName);
                 if (region == null) {
                   handleRegionNull(serverConnection, regionName, batchId);
@@ -481,57 +496,59 @@ public class GatewayReceiverCommand extends BaseCommand {
                     new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
                         Integer.valueOf(numberOfEvents)}),
                     e);
-                handleException(removeOnException, e);
-              }
-            } while (retry);
-            break;
-          case 2: // Destroy
-            // Retrieve the callbackArg from the message parts if necessary
-            index = partNumber + 5;
-            callbackArgExistsPart = clientMessage.getPart(index++); {
-            byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
-            callbackArgExists = partBytes[0] == 0x01;
-          }
-            if (callbackArgExists) {
-              callbackArgPart = clientMessage.getPart(index++);
-              try {
-                callbackArg = callbackArgPart.getObject();
-              } catch (Exception e) {
-                logger.warn(LocalizedMessage.create(
-                    LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_DESTROY_REQUEST_1_CONTAINING_2_EVENTS,
-                    new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
-                        Integer.valueOf(numberOfEvents)}),
-                    e);
-                throw e;
-              }
-            }
-
-            versionTimeStamp = clientMessage.getPart(index++).getLong();
-            if (logger.isDebugEnabled()) {
-              logger.debug("{}: Processing batch destroy request {} on {} for region {} key {}",
-                  serverConnection.getName(), batchId, serverConnection.getSocketString(),
-                  regionName, key);
-            }
-
-            // Process the destroy request
-            if (key == null || regionName == null) {
-              StringId message = null;
-              if (key == null) {
-                message =
-                    LocalizedStrings.ProcessBatch_0_THE_INPUT_KEY_FOR_THE_BATCH_DESTROY_REQUEST_1_IS_NULL;
+                handleException(removeOnException, stats, e);
               }
-              if (regionName == null) {
-                message =
-                    LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_DESTROY_REQUEST_1_IS_NULL;
-              }
-              Object[] messageArgs =
-                  new Object[] {serverConnection.getName(), Integer.valueOf(batchId)};
-              String s = message.toLocalizedString(messageArgs);
-              logger.warn(s);
-              throw new Exception(s);
-            }
-            do {
+              break;
+            case 2: // Destroy
               try {
+                // Retrieve the callbackArg from the message parts if necessary
+                index = partNumber + 5;
+                callbackArgExistsPart = clientMessage.getPart(index++);
+                {
+                  byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
+                  callbackArgExists = partBytes[0] == 0x01;
+                }
+                if (callbackArgExists) {
+                  callbackArgPart = clientMessage.getPart(index++);
+                  try {
+                    callbackArg = callbackArgPart.getObject();
+                  } catch (Exception e) {
+                    logger
+                        .warn(
+                            LocalizedMessage
+                                .create(
+                                    LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_DESTROY_REQUEST_1_CONTAINING_2_EVENTS,
+                                    new Object[] {serverConnection.getName(),
+                                        Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}),
+                            e);
+                    throw e;
+                  }
+                }
+
+                versionTimeStamp = clientMessage.getPart(index++).getLong();
+                if (logger.isDebugEnabled()) {
+                  logger.debug("{}: Processing batch destroy request {} on {} for region {} key {}",
+                      serverConnection.getName(), batchId, serverConnection.getSocketString(),
+                      regionName, key);
+                }
+
+                // Process the destroy request
+                if (key == null || regionName == null) {
+                  StringId message = null;
+                  if (key == null) {
+                    message =
+                        LocalizedStrings.ProcessBatch_0_THE_INPUT_KEY_FOR_THE_BATCH_DESTROY_REQUEST_1_IS_NULL;
+                  }
+                  if (regionName == null) {
+                    message =
+                        LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_DESTROY_REQUEST_1_IS_NULL;
+                  }
+                  Object[] messageArgs =
+                      new Object[] {serverConnection.getName(), Integer.valueOf(batchId)};
+                  String s = message.toLocalizedString(messageArgs);
+                  logger.warn(s);
+                  throw new Exception(s);
+                }
                 region = (LocalRegion) crHelper.getRegion(regionName);
                 if (region == null) {
                   handleRegionNull(serverConnection, regionName, batchId);
@@ -552,121 +569,120 @@ public class GatewayReceiverCommand extends BaseCommand {
                         authzRequest.destroyAuthorize(regionName, key, callbackArg);
                     callbackArg = destroyContext.getCallbackArg();
                   }
-                  region.basicBridgeDestroy(key, callbackArg, serverConnection.getProxyID(), false,
-                      clientEvent);
-                  serverConnection.setModificationInfo(true, regionName, key);
+                  try {
+                    region.basicBridgeDestroy(key, callbackArg, serverConnection.getProxyID(),
+                        false, clientEvent);
+                    serverConnection.setModificationInfo(true, regionName, key);
+                  } catch (EntryNotFoundException e) {
+                    logger.info(LocalizedMessage.create(
+                        LocalizedStrings.ProcessBatch_0_DURING_BATCH_DESTROY_NO_ENTRY_WAS_FOUND_FOR_KEY_1,
+                        new Object[] {serverConnection.getName(), key}));
+                    // throw new Exception(e);
+                  }
                   stats.incDestroyRequest();
                   retry = false;
                 }
-              } catch (EntryNotFoundException e) {
-                logger.info(LocalizedMessage.create(
-                    LocalizedStrings.ProcessBatch_0_DURING_BATCH_DESTROY_NO_ENTRY_WAS_FOUND_FOR_KEY_1,
-                    new Object[] {serverConnection.getName(), key}));
-                // throw new Exception(e);
               } catch (Exception e) {
                 logger.warn(LocalizedMessage.create(
                     LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_DESTROY_REQUEST_1_CONTAINING_2_EVENTS,
                     new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
                         Integer.valueOf(numberOfEvents)}),
                     e);
-                handleException(removeOnException, e);
+                handleException(removeOnException, stats, e);
               }
-            } while (retry);
-            break;
-          case 3: // Update Time-stamp for a RegionEntry
+              break;
+            case 3: // Update Time-stamp for a RegionEntry
 
-            try {
-              // Region name
-              regionNamePart = clientMessage.getPart(partNumber + 2);
-              regionName = regionNamePart.getString();
+              try {
+                // Region name
+                regionNamePart = clientMessage.getPart(partNumber + 2);
+                regionName = regionNamePart.getString();
 
-              // Retrieve the event id from the message parts
-              eventIdPart = clientMessage.getPart(partNumber + 3);
-              eventId = (EventID) eventIdPart.getObject();
+                // Retrieve the event id from the message parts
+                eventIdPart = clientMessage.getPart(partNumber + 3);
+                eventId = (EventID) eventIdPart.getObject();
 
-              // Retrieve the key from the message parts
-              keyPart = clientMessage.getPart(partNumber + 4);
-              key = keyPart.getStringOrObject();
+                // Retrieve the key from the message parts
+                keyPart = clientMessage.getPart(partNumber + 4);
+                key = keyPart.getStringOrObject();
 
-              // Retrieve the callbackArg from the message parts if necessary
-              index = partNumber + 5;
-              callbackArgExistsPart = clientMessage.getPart(index++);
+                // Retrieve the callbackArg from the message parts if necessary
+                index = partNumber + 5;
+                callbackArgExistsPart = clientMessage.getPart(index++);
 
-              byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
-              callbackArgExists = partBytes[0] == 0x01;
+                byte[] partBytes = (byte[]) callbackArgExistsPart.getObject();
+                callbackArgExists = partBytes[0] == 0x01;
 
-              if (callbackArgExists) {
-                callbackArgPart = clientMessage.getPart(index++);
-                callbackArg = callbackArgPart.getObject();
-              }
+                if (callbackArgExists) {
+                  callbackArgPart = clientMessage.getPart(index++);
+                  callbackArg = callbackArgPart.getObject();
+                }
 
-            } catch (Exception e) {
-              logger.warn(LocalizedMessage.create(
-                  LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_VERSION_REQUEST_1_CONTAINING_2_EVENTS,
-                  new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
-                      Integer.valueOf(numberOfEvents)}),
-                  e);
-              throw e;
-            }
-
-            versionTimeStamp = clientMessage.getPart(index++).getLong();
-            if (logger.isDebugEnabled()) {
-              logger.debug(
-                  "{}: Processing batch update-version request {} on {} for region {} key {} value {} callbackArg {}",
-                  serverConnection.getName(), batchId, serverConnection.getSocketString(),
-                  regionName, key, valuePart, callbackArg);
-            }
-            // Process the update time-stamp request
-            if (key == null || regionName == null) {
-              StringId message =
-                  LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_VERSION_REQUEST_1_CONTAINING_2_EVENTS;
-
-              Object[] messageArgs = new Object[] {serverConnection.getName(),
-                  Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)};
-              String s = message.toLocalizedString(messageArgs);
-              logger.warn(s);
-              throw new Exception(s);
-
-            } else {
-              region = (LocalRegion) crHelper.getRegion(regionName);
-
-              if (region == null) {
-                handleRegionNull(serverConnection, regionName, batchId);
-              } else {
-
-                clientEvent = new EventIDHolder(eventId);
-
-                if (versionTimeStamp > 0) {
-                  VersionTag tag = VersionTag.create(region.getVersionMember());
-                  tag.setIsGatewayTag(true);
-                  tag.setVersionTimeStamp(versionTimeStamp);
-                  tag.setDistributedSystemId(dsid);
-                  clientEvent.setVersionTag(tag);
+                versionTimeStamp = clientMessage.getPart(index++).getLong();
+                if (logger.isDebugEnabled()) {
+                  logger.debug(
+                      "{}: Processing batch update-version request {} on {} for region {} key {} value {} callbackArg {}",
+                      serverConnection.getName(), batchId, serverConnection.getSocketString(),
+                      regionName, key, valuePart, callbackArg);
                 }
+                // Process the update time-stamp request
+                if (key == null || regionName == null) {
+                  StringId message =
+                      LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_VERSION_REQUEST_1_CONTAINING_2_EVENTS;
+
+                  Object[] messageArgs = new Object[] {serverConnection.getName(),
+                      Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)};
+                  String s = message.toLocalizedString(messageArgs);
+                  logger.warn(s);
+                  throw new Exception(s);
+
+                } else {
+                  region = (LocalRegion) crHelper.getRegion(regionName);
+
+                  if (region == null) {
+                    handleRegionNull(serverConnection, regionName, batchId);
+                  } else {
 
-                // Update the version tag
-                try {
+                    clientEvent = new EventIDHolder(eventId);
 
-                  region.basicBridgeUpdateVersionStamp(key, callbackArg,
-                      serverConnection.getProxyID(), false, clientEvent);
+                    if (versionTimeStamp > 0) {
+                      VersionTag tag = VersionTag.create(region.getVersionMember());
+                      tag.setIsGatewayTag(true);
+                      tag.setVersionTimeStamp(versionTimeStamp);
+                      tag.setDistributedSystemId(dsid);
+                      clientEvent.setVersionTag(tag);
+                    }
 
-                } catch (EntryNotFoundException e) {
-                  logger.info(LocalizedMessage.create(
-                      LocalizedStrings.ProcessBatch_0_DURING_BATCH_UPDATE_VERSION_NO_ENTRY_WAS_FOUND_FOR_KEY_1,
-                      new Object[] {serverConnection.getName(), key}));
-                  // throw new Exception(e);
+                    // Update the version tag
+                    try {
+                      region.basicBridgeUpdateVersionStamp(key, callbackArg,
+                          serverConnection.getProxyID(), false, clientEvent);
+                    } catch (EntryNotFoundException e) {
+                      logger.info(LocalizedMessage.create(
+                          LocalizedStrings.ProcessBatch_0_DURING_BATCH_UPDATE_VERSION_NO_ENTRY_WAS_FOUND_FOR_KEY_1,
+                          new Object[] {serverConnection.getName(), key}));
+                    }
+                    retry = false;
+                  }
                 }
+              } catch (Exception e) {
+                logger.warn(LocalizedMessage.create(
+                    LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_VERSION_REQUEST_1_CONTAINING_2_EVENTS,
+                    new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
+                        Integer.valueOf(numberOfEvents)}),
+                    e);
+                handleException(removeOnException, stats, e);
               }
-            }
 
-            break;
-          default:
-            logger.fatal(LocalizedMessage.create(
-                LocalizedStrings.Processbatch_0_UNKNOWN_ACTION_TYPE_1_FOR_BATCH_FROM_2,
-                new Object[] {serverConnection.getName(), Integer.valueOf(actionType),
-                    serverConnection.getSocketString()}));
-            stats.incUnknowsOperationsReceived();
-        }
+              break;
+            default:
+              logger.fatal(LocalizedMessage.create(
+                  LocalizedStrings.Processbatch_0_UNKNOWN_ACTION_TYPE_1_FOR_BATCH_FROM_2,
+                  new Object[] {serverConnection.getName(), Integer.valueOf(actionType),
+                      serverConnection.getSocketString()}));
+              stats.incUnknowsOperationsReceived();
+          }
+        } while (retry);
       } catch (CancelException e) {
         if (logger.isDebugEnabled()) {
           logger.debug(
@@ -766,10 +782,12 @@ public class GatewayReceiverCommand extends BaseCommand {
     return true;
   }
 
-  private void handleException(boolean removeOnException, Exception e) throws Exception {
+  private void handleException(boolean removeOnException, GatewayReceiverStats stats, Exception e)
+      throws Exception {
     if (shouldThrowException(removeOnException, e)) {
       throw e;
     } else {
+      stats.incEventsRetried();
       Thread.sleep(500);
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java
index 0561b4f..6935dc6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java
@@ -56,6 +56,9 @@ public class GatewayReceiverStats extends CacheServerStats {
   /** Name of the unprocessed events added by primary statistic */
   private static final String EXCEPTIONS_OCCURRED = "exceptionsOccurred";
 
+  /** Name of the events retried */
+  private static final String EVENTS_RETRIED = "eventsRetried";
+
   // /** Id of the events queued statistic */
   // private int failoverBatchesReceivedId;
 
@@ -86,6 +89,9 @@ public class GatewayReceiverStats extends CacheServerStats {
   /** Id of the unprocessed events added by primary statistic */
   private int exceptionsOccurredId;
 
+  /** Id of the events retried statistic */
+  private int eventsRetriedId;
+
   // ///////////////////// Constructors ///////////////////////
 
   public static GatewayReceiverStats createGatewayReceiverStats(String ownerName) {
@@ -110,7 +116,9 @@ public class GatewayReceiverStats extends CacheServerStats {
         f.createIntCounter(UNKNOWN_OPERATIONS_RECEIVED,
             "total number of unknown operations received by this GatewayReceiver", "operations"),
         f.createIntCounter(EXCEPTIONS_OCCURRED,
-            "number of exceptions occurred while porcessing the batches", "operations")};
+            "number of exceptions occurred while porcessing the batches", "operations"),
+        f.createIntCounter(EVENTS_RETRIED,
+            "total number events retried by this GatewayReceiver due to exceptions", "operations")};
     return new GatewayReceiverStats(f, ownerName, typeName, descriptors);
 
   }
@@ -129,6 +137,7 @@ public class GatewayReceiverStats extends CacheServerStats {
     destroyRequestId = statType.nameToId(DESTROY_REQUESTS);
     unknowsOperationsReceivedId = statType.nameToId(UNKNOWN_OPERATIONS_RECEIVED);
     exceptionsOccurredId = statType.nameToId(EXCEPTIONS_OCCURRED);
+    eventsRetriedId = statType.nameToId(EVENTS_RETRIED);
   }
 
   // /////////////////// Instance Methods /////////////////////
@@ -244,6 +253,17 @@ public class GatewayReceiverStats extends CacheServerStats {
   }
 
   /**
+   * Increments the number of events received by 1.
+   */
+  public void incEventsRetried() {
+    this.stats.incInt(eventsRetriedId, 1);
+  }
+
+  public int getEventsRetried() {
+    return this.stats.getInt(eventsRetriedId);
+  }
+
+  /**
    * Returns the current time (ns).
    * 
    * @return the current time (ns)
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/KeepEventsOnGatewaySenderQueueDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/KeepEventsOnGatewaySenderQueueDUnitTest.java
index 6c89f0f..0f49a88 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/KeepEventsOnGatewaySenderQueueDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/KeepEventsOnGatewaySenderQueueDUnitTest.java
@@ -14,15 +14,23 @@
  */
 package org.apache.geode.internal.cache.wan.misc;
 
+import static org.assertj.core.api.Assertions.*;
+
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import org.awaitility.Awaitility;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.GatewayReceiverStats;
 import org.apache.geode.internal.cache.wan.WANTestBase;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.junit.categories.DistributedTest;
@@ -35,10 +43,10 @@ public class KeepEventsOnGatewaySenderQueueDUnitTest extends WANTestBase {
   }
 
   @Test
-  public void testBasicKeepEventsOnGatewaySenderQueue() throws Exception {
+  public void testKeepEventsOnGatewaySenderQueueWithPartitionOfflineException() throws Exception {
     // Start locators
-    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = vm2.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+    Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+    Integer nyPort = vm2.invoke(() -> createFirstRemoteLocator(2, lnPort));
 
     String regionName = getTestMethodName() + "_PR";
     String senderId = "ny";
@@ -51,7 +59,7 @@ public class KeepEventsOnGatewaySenderQueueDUnitTest extends WANTestBase {
     vm4.invoke(() -> assignBuckets(regionName));
 
     // Configure sending site members
-    createCacheInVMs(lnPort, vm1);
+    vm1.invoke(() -> createCache(lnPort));
     vm1.invoke(() -> createSender(senderId, 2, true, 100, 10, false, true, null, false));
     vm1.invoke(() -> disableRemoveFromQueueOnException(senderId));
     vm1.invoke(() -> createPartitionedRegionWithPersistence(regionName, senderId, 0, 100));
@@ -63,7 +71,7 @@ public class KeepEventsOnGatewaySenderQueueDUnitTest extends WANTestBase {
     AsyncInvocation<Integer> closeOpenInvocation =
         vm3.invokeAsync(() -> closeRecreateCache(nyPort, regionName, 3));
 
-    // Once puts are complete, wait for queue to be empty
+    // Once puts are complete, wait for sending site member queue to be empty
     int numPuts = putInvocation.get(120, TimeUnit.SECONDS);
     vm1.invoke(() -> validateQueueSizeStat(senderId, 0));
 
@@ -74,6 +82,43 @@ public class KeepEventsOnGatewaySenderQueueDUnitTest extends WANTestBase {
     vm4.invoke(() -> validateRegionSize(regionName, numPuts));
   }
 
+  @Test
+  public void testKeepEventsOnGatewaySenderQueueWithRegionDestroyedException() throws Exception {
+    // Start locators
+    Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+    Integer nyPort = vm2.invoke(() -> createFirstRemoteLocator(2, lnPort));
+
+    String regionName = getTestMethodName() + "_PR";
+    String senderId = "ny";
+
+    // Configure receiving site member
+    vm3.invoke(() -> createCache(nyPort));
+    vm3.invoke(() -> createReceiver());
+
+    // Configure sending site member
+    vm1.invoke(() -> createCache(lnPort));
+    vm1.invoke(() -> createSender(senderId, 2, true, 100, 10, false, true, null, false));
+    vm1.invoke(() -> disableRemoveFromQueueOnException(senderId));
+    vm1.invoke(() -> createPartitionedRegionWithPersistence(regionName, senderId, 0, 100));
+
+    // Do puts in sending site member
+    int numPuts = 10;
+    vm1.invoke(() -> doPuts(regionName, numPuts));
+
+    // Wait for some retries to occur
+    vm3.invoke(() -> waitForEventRetries(10));
+
+    // Create region in receiving site member
+    vm3.invoke(() -> createPartitionedRegionWithPersistence(regionName, null, 0, 100));
+
+    // Wait for sending site member queue to be empty
+    vm1.invoke(() -> validateQueueSizeStat(senderId, 0));
+
+    // Verify region sizes in both sites
+    vm1.invoke(() -> validateRegionSize(regionName, numPuts));
+    vm3.invoke(() -> validateRegionSize(regionName, numPuts));
+  }
+
   private void disableRemoveFromQueueOnException(String senderId) {
     AbstractGatewaySender ags = (AbstractGatewaySender) cache.getGatewaySender(senderId);
     ags.setRemoveFromQueueOnException(false);
@@ -106,4 +151,18 @@ public class KeepEventsOnGatewaySenderQueueDUnitTest extends WANTestBase {
       createPartitionedRegionWithPersistence(regionName, null, 0, 100);
     }
   }
+
+  private void waitForEventRetries(int numRetries) {
+    GatewayReceiverStats stats = getGatewayReceiverStats();
+    Awaitility.await().atMost(60, TimeUnit.SECONDS)
+        .until(() -> stats.getEventsRetried() > numRetries);
+  }
+
+  private GatewayReceiverStats getGatewayReceiverStats() {
+    Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
+    GatewayReceiver receiver = gatewayReceivers.iterator().next();
+    CacheServerStats stats = ((CacheServerImpl) receiver.getServer()).getAcceptor().getStats();
+    assertThat(stats).isInstanceOf(GatewayReceiverStats.class);
+    return (GatewayReceiverStats) stats;
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].