You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2016/03/21 21:39:56 UTC

[25/54] [abbrv] incubator-geode git commit: GEODE-478: GatewaySender now handles MessageTooLargeExceptions

GEODE-478: GatewaySender now handles MessageTooLargeExceptions


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/a904f147
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/a904f147
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/a904f147

Branch: refs/heads/feature/GEODE-17-2
Commit: a904f1474ec3153bc39f650d49731214f25c6230
Parents: 4d0dfc5
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Tue Mar 8 15:55:34 2016 -0800
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Wed Mar 16 09:56:41 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/tier/sockets/Message.java    |  2 +-
 .../AbstractGatewaySenderEventProcessor.java    | 33 +++++++--
 .../parallel/ParallelGatewaySenderQueue.java    | 74 ++++++++++++++++----
 .../gemfire/internal/i18n/LocalizedStrings.java |  6 +-
 .../wan/GatewaySenderEventRemoteDispatcher.java | 31 +++++++-
 .../gemfire/internal/cache/wan/WANTestBase.java | 26 +++++--
 ...arallelGatewaySenderOperationsDUnitTest.java | 35 +++++++++
 7 files changed, 177 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
index a6495e2..44c88c1 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
@@ -564,7 +564,7 @@ public class Message  {
         msgLen = (int)(headerLen + totalPartLen);
         
         if (msgLen > MAX_MESSAGE_SIZE) {
-          throw new MessageTooLargeException("Message size(" + msgLen
+          throw new MessageTooLargeException("Message size (" + msgLen
               + ") exceeds gemfire.client.max-message-size setting (" + MAX_MESSAGE_SIZE + ")");
         }
         

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 86ecce1..51b125a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -38,12 +38,9 @@ import com.gemstone.gemfire.cache.EntryEvent;
 import com.gemstone.gemfire.cache.Operation;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionDestroyedException;
-import com.gemstone.gemfire.cache.client.internal.Connection;
-import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException;
 import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
 import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
 import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.cache.BucketRegion;
 import com.gemstone.gemfire.internal.cache.Conflatable;
 import com.gemstone.gemfire.internal.cache.DistributedRegion;
@@ -143,6 +140,13 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
   private volatile boolean resetLastPeekedEvents;
   
   private long numEventsDispatched;
+
+  /**
+   * The batchSize is the batch size being used by this processor. By default, it is the
+   * configured batch size of the GatewaySender. It may be automatically reduced if a
+   * MessageTooLargeException occurs.
+   */
+  private int batchSize;
   
   /**
    * @param createThreadGroup
@@ -152,6 +156,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
       String string, GatewaySender sender) {
     super(createThreadGroup, string);
     this.sender = (AbstractGatewaySender)sender;
+    this.batchSize = sender.getBatchSize();
   }
 
   abstract protected void initializeMessageQueue(String id);
@@ -214,6 +219,23 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
     this.resetLastPeekedEvents = true;
   }
 
+  protected int getBatchSize() {
+    return this.batchSize;
+  }
+
+  protected void setBatchSize(int batchSize) {
+    int currentBatchSize = this.batchSize;
+    if (batchSize <= 0) {
+      this.batchSize = 1;
+      logger.warn(LocalizedMessage.create(
+          LocalizedStrings.AbstractGatewaySenderEventProcessor_ATTEMPT_TO_SET_BATCH_SIZE_FAILED, new Object[] { currentBatchSize, batchSize }));
+    } else {
+      this.batchSize = batchSize;
+      logger.info(LocalizedMessage.create(
+          LocalizedStrings.AbstractGatewaySenderEventProcessor_SET_BATCH_SIZE, new Object[] { currentBatchSize, this.batchSize }));
+    }
+  }
+
   /**
    * Returns the current batch id to be used to identify the next batch.
    * 
@@ -387,7 +409,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     final boolean isTraceEnabled = logger.isTraceEnabled();
     
-    final int batchSize = sender.getBatchSize();
     final int batchTimeInterval = sender.getBatchTimeInterval();
     final GatewaySenderStats statistics = this.sender.getStatistics();
     
@@ -417,7 +438,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
 
         // Peek a batch
         if (isDebugEnabled) {
-          logger.debug("Attempting to peek a batch of {} events", batchSize);
+          logger.debug("Attempting to peek a batch of {} events", this.batchSize);
         }
         for (;;) {
           // check before sleeping
@@ -481,7 +502,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
               }
             }*/
             }
-            events = this.queue.peek(batchSize, batchTimeInterval);
+            events = this.queue.peek(this.batchSize, batchTimeInterval);
           } catch (InterruptedException e) {
             interrupted = true;
             this.sender.getCancelCriterion().checkCancelInProgress(e);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index c00903f..a9d0f3e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -54,7 +54,6 @@ import com.gemstone.gemfire.cache.EvictionAttributes;
 import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionAttributes;
-import com.gemstone.gemfire.cache.RegionDestroyedException;
 import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import com.gemstone.gemfire.distributed.internal.DM;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
@@ -143,7 +142,17 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   private static BatchRemovalThread removalThread = null;
 
   protected BlockingQueue<GatewaySenderEventImpl> peekedEvents = new LinkedBlockingQueue<GatewaySenderEventImpl>();
-  
+
+  /**
+   * The peekedEventsProcessing queue is used when the batch size is reduced due to a MessageTooLargeException
+   */
+  private BlockingQueue<GatewaySenderEventImpl> peekedEventsProcessing = new LinkedBlockingQueue<GatewaySenderEventImpl>();
+
+  /**
+   * The peekedEventsProcessingInProgress boolean denotes that processing existing peeked events is in progress
+   */
+  private boolean peekedEventsProcessingInProgress = false;
+
   public final AbstractGatewaySender sender ;
   
   public static final int WAIT_CYCLE_SHADOW_BUCKET_LOAD = 10;
@@ -1147,6 +1156,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
   public void resetLastPeeked() {
     this.resetLastPeeked = true;
+
+    // Reset the in progress boolean and queue for peeked events in progress
+    this.peekedEventsProcessingInProgress = false;
+    this.peekedEventsProcessing.clear();
   }
   
   // Need to improve here.If first peek returns NULL then look in another bucket.
@@ -1283,19 +1296,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     long start = System.currentTimeMillis();
     long end = start + timeToWait;
 
-    if (this.resetLastPeeked) {
-      batch.addAll(peekedEvents);
-      this.resetLastPeeked = false;
-      if (isDebugEnabled) {
-        StringBuffer buffer = new StringBuffer();
-        for (GatewaySenderEventImpl ge : peekedEvents) {
-          buffer.append("event :");
-          buffer.append(ge);
-        }
-        logger.debug("Adding already peeked events to the batch {}", buffer);
-      }
-    }
-    
+    // Add peeked events
+    addPeekedEvents(batch, batchSize);
+
     int bId = -1;
     while (batch.size() < batchSize) {
       if (areLocalBucketQueueRegionsPresent()
@@ -1372,6 +1375,47 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     return batch;
   }
 
+  private void addPeekedEvents(List batch, int batchSize) {
+    if (this.resetLastPeeked) {
+      if (this.peekedEventsProcessingInProgress) {
+        // Peeked event processing is in progress. This means that the original peekedEvents
+        // contained > batch size events due to a reduction in the batch size. Create a batch
+        // from the peekedEventsProcessing queue.
+        addPreviouslyPeekedEvents(batch, batchSize);
+      } else if (peekedEvents.size() <= batchSize) {
+        // This is the normal case. The connection was lost while processing a batch.
+        // This recreates the batch from the current peekedEvents.
+        batch.addAll(peekedEvents);
+        this.resetLastPeeked = false;
+      } else {
+        // The peekedEvents queue is > batch size. This means that the previous batch size was
+        // reduced due to MessageTooLargeException. Create a batch from the peekedEventsProcessing queue.
+        this.peekedEventsProcessing.addAll(this.peekedEvents);
+        this.peekedEventsProcessingInProgress = true;
+        addPreviouslyPeekedEvents(batch, batchSize);
+      }
+      if (logger.isDebugEnabled()) {
+        StringBuffer buffer = new StringBuffer();
+        for (Object ge : batch) {
+          buffer.append("event :");
+          buffer.append(ge);
+        }
+        logger.debug("Adding already peeked events to the batch {}", buffer);
+      }
+    }
+  }
+
+  private void addPreviouslyPeekedEvents(List batch, int batchSize) {
+    for (int i=0; i<batchSize; i++) {
+      batch.add(this.peekedEventsProcessing.remove());
+      if (this.peekedEventsProcessing.isEmpty()) {
+        this.resetLastPeeked = false;
+        this.peekedEventsProcessingInProgress = false;
+        break;
+      }
+    }
+  }
+
   protected void blockProcesorThreadIfRequired() throws InterruptedException {
     queueEmptyLock.lock();
     try {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
index 8147718..3996692 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
@@ -2145,7 +2145,11 @@ public class LocalizedStrings extends ParentLocalizedStrings {
   public static final StringId AUTH_FAILED_TO_ACQUIRE_AUTHINITIALIZE_INSTANCE = new StringId(6611, "AuthInitialize instance could not be obtained");
   public static final StringId AUTH_FAILED_TO_OBTAIN_CREDENTIALS_IN_0_USING_AUTHINITIALIZE_1_2 = new StringId(6612, "Failed to obtain credentials using AuthInitialize [{1}]. {2}");
   public static final StringId DistributedSystem_BACKUP_ALREADY_IN_PROGRESS = new StringId(6613, "A backup is already in progress.");
-  
+
+  public static final StringId AbstractGatewaySenderEventProcessor_SET_BATCH_SIZE = new StringId(6614, "Set the batch size from {0} to {1} events");
+  public static final StringId AbstractGatewaySenderEventProcessor_ATTEMPT_TO_SET_BATCH_SIZE_FAILED = new StringId(6615, "Attempting to set the batch size from {0} to {1} events failed. Instead it was set to 1.");
+  public static final StringId GatewaySenderEventRemoteDispatcher_MESSAGE_TOO_LARGE_EXCEPTION = new StringId(6616, "The following exception occurred attempting to send a batch of {0} events. The batch will be tried again after reducing the batch size to {1} events.");
+
   /** Testing strings, messageId 90000-99999 **/
   
   /** These are simple messages for testing, translated with Babelfish. **/

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 9da6748..22dff3d 100644
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.gemstone.gemfire.GemFireIOException;
+import com.gemstone.gemfire.internal.cache.tier.sockets.MessageTooLargeException;
 import org.apache.logging.log4j.Logger;
 
 import com.gemstone.gemfire.CancelException;
@@ -31,7 +33,6 @@ import com.gemstone.gemfire.cache.RegionDestroyedException;
 import com.gemstone.gemfire.cache.client.ServerConnectivityException;
 import com.gemstone.gemfire.cache.client.ServerOperationException;
 import com.gemstone.gemfire.cache.client.internal.Connection;
-import com.gemstone.gemfire.cache.client.internal.ServerProxy;
 import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException;
 import com.gemstone.gemfire.cache.wan.GatewaySender;
 import com.gemstone.gemfire.distributed.internal.ServerLocation;
@@ -151,7 +152,9 @@ public class GatewaySenderEventRemoteDispatcher implements
     try {
       long start = statistics.startTime();
       success =_dispatchBatch(events, isRetry);
-      statistics.endBatch(start, events.size());
+      if (success) {
+        statistics.endBatch(start, events.size());
+      }
     } catch (GatewaySenderException ge) {
 
       Throwable t = ge.getCause();
@@ -159,7 +162,8 @@ public class GatewaySenderEventRemoteDispatcher implements
         // if our pool is shutdown then just be silent
       } else if (t instanceof IOException
           || t instanceof ServerConnectivityException
-          || t instanceof ConnectionDestroyedException) {
+          || t instanceof ConnectionDestroyedException
+          || t instanceof MessageTooLargeException) {
         this.processor.handleException();
         // If the cause is an IOException or a ServerException, sleep and retry.
         // Sleep for a bit and recheck.
@@ -243,6 +247,27 @@ public class GatewaySenderEventRemoteDispatcher implements
           LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(
               new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex);
     }
+    catch (GemFireIOException e) {
+      Throwable t = e.getCause();
+      if (t instanceof MessageTooLargeException) {
+        // A MessageTooLargeException has occurred.
+        // Do not process the connection as dead since it is not dead.
+        ex = (MessageTooLargeException)t;
+        // Reduce the batch size by half of the configured batch size or number of events in the current batch (whichever is less)
+        int newBatchSize = Math.min(events.size(), this.processor.getBatchSize())/2;
+        logger.warn(LocalizedMessage.create(
+            LocalizedStrings.GatewaySenderEventRemoteDispatcher_MESSAGE_TOO_LARGE_EXCEPTION, new Object[] { events.size(), newBatchSize }), e);
+        this.processor.setBatchSize(newBatchSize);
+      }
+      else {
+        ex = e;
+        // keep using the connection if we had a MessageTooLargeException. Else, destroy it
+        destroyConnection();
+      }
+      throw new GatewaySenderException(
+          LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(
+              new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex);
+    }
     catch (Exception e) {
       // An Exception has occurred. Get its cause.
       Throwable t = e.getCause();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
index 0a1a7ef..5da6b5c 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
@@ -2257,7 +2257,7 @@ public class WANTestBase extends DistributedTestCase{
   public static void createSender(String dsName, int remoteDsId,
       boolean isParallel, Integer maxMemory,
       Integer batchSize, boolean isConflation, boolean isPersistent,
-      GatewayEventFilter filter, boolean isManulaStart) {
+      GatewayEventFilter filter, boolean isManualStart) {
     final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
     try {
       File persistentDirectory = new File(dsName + "_disk_"
@@ -2270,7 +2270,7 @@ public class WANTestBase extends DistributedTestCase{
         gateway.setParallel(true);
         gateway.setMaximumQueueMemory(maxMemory);
         gateway.setBatchSize(batchSize);
-        gateway.setManualStart(isManulaStart);
+        gateway.setManualStart(isManualStart);
         //set dispatcher threads
         gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
         ((InternalGatewaySenderFactory) gateway)
@@ -2294,7 +2294,7 @@ public class WANTestBase extends DistributedTestCase{
         GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
         gateway.setMaximumQueueMemory(maxMemory);
         gateway.setBatchSize(batchSize);
-        gateway.setManualStart(isManulaStart);
+        gateway.setManualStart(isManualStart);
         //set dispatcher threads
         gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
         ((InternalGatewaySenderFactory) gateway)
@@ -3056,7 +3056,25 @@ public class WANTestBase extends DistributedTestCase{
 //      r.destroy(i);
 //    }
   }
-  
+
+
+  public static void doPuts(String regionName, int numPuts, Object value) {
+    IgnoredException exp1 = IgnoredException.addIgnoredException(InterruptedException.class
+        .getName());
+    IgnoredException exp2 = IgnoredException.addIgnoredException(GatewaySenderException.class
+        .getName());
+    try {
+      Region r = cache.getRegion(Region.SEPARATOR + regionName);
+      assertNotNull(r);
+      for (long i = 0; i < numPuts; i++) {
+        r.put(i, value);
+      }
+    } finally {
+      exp1.remove();
+      exp2.remove();
+    }
+  }
+
   public static void doPuts(String regionName, int numPuts) {
     IgnoredException exp1 = IgnoredException.addIgnoredException(InterruptedException.class
         .getName());

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index 9e1b28c..f929d89 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -16,8 +16,10 @@
  */
 package com.gemstone.gemfire.internal.cache.wan.parallel;
 
+import com.gemstone.gemfire.GemFireIOException;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.internal.cache.tier.sockets.MessageTooLargeException;
 import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
 import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
 import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
@@ -555,6 +557,39 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
     vm7.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", true ));
   }
 
+  public void testParallelGatewaySenderMessageTooLargeException() {
+    Integer[] locatorPorts = createLNAndNYLocators();
+    Integer lnPort = locatorPorts[0];
+    Integer nyPort = locatorPorts[1];
+
+    // Create and start sender with reduced maximum message size and 1 dispatcher thread
+    String regionName = getTestMethodName() + "_PR";
+    vm4.invoke(() -> setMaximumMessageSize( 1024*1024 ));
+    vm4.invoke(() -> createCache( lnPort ));
+    vm4.invoke(() -> setNumDispatcherThreadsForTheRun( 1 ));
+    vm4.invoke(() -> createSender( "ln", 2, true, 100, 100, false, false, null, false ));
+    vm4.invoke(() -> createPartitionedRegion( regionName, "ln", 0, 100, isOffHeap() ));
+
+    // Do puts
+    int numPuts = 200;
+    vm4.invoke(() -> doPuts( regionName, numPuts, new byte[11000] ));
+    validateRegionSizes(regionName, numPuts, vm4);
+
+    // Start receiver
+    IgnoredException ignoredMTLE = IgnoredException.addIgnoredException(MessageTooLargeException.class.getName(), vm4);
+    IgnoredException ignoredGIOE = IgnoredException.addIgnoredException(GemFireIOException.class.getName(), vm4);
+    vm2.invoke(() -> createReceiver( nyPort ));
+    vm2.invoke(() -> createPartitionedRegion( regionName, null, 0, 100, isOffHeap() ));
+    validateRegionSizes( regionName, numPuts, vm2 );
+    ignoredMTLE.remove();
+    ignoredGIOE.remove();
+  }
+
+  private void setMaximumMessageSize(int maximumMessageSizeBytes) {
+    System.setProperty("gemfire.client.max-message-size", String.valueOf(maximumMessageSizeBytes));
+    LogWriterUtils.getLogWriter().info("Set gemfire.client.max-message-size: " + System.getProperty("gemfire.client.max-message-size"));
+  }
+
   private void createSendersReceiversAndPartitionedRegion(Integer lnPort, Integer nyPort, boolean createAccessors,
       boolean startSenders) {
     // Note: This is a test-specific method used by several test to create