You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/03/17 23:58:09 UTC
[09/33] incubator-geode git commit: GEODE-478: GatewaySender now
handles MessageTooLargeExceptions
GEODE-478: GatewaySender now handles MessageTooLargeExceptions
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/a904f147
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/a904f147
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/a904f147
Branch: refs/heads/feature/GEODE-1050
Commit: a904f1474ec3153bc39f650d49731214f25c6230
Parents: 4d0dfc5
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Tue Mar 8 15:55:34 2016 -0800
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Wed Mar 16 09:56:41 2016 -0700
----------------------------------------------------------------------
.../internal/cache/tier/sockets/Message.java | 2 +-
.../AbstractGatewaySenderEventProcessor.java | 33 +++++++--
.../parallel/ParallelGatewaySenderQueue.java | 74 ++++++++++++++++----
.../gemfire/internal/i18n/LocalizedStrings.java | 6 +-
.../wan/GatewaySenderEventRemoteDispatcher.java | 31 +++++++-
.../gemfire/internal/cache/wan/WANTestBase.java | 26 +++++--
...arallelGatewaySenderOperationsDUnitTest.java | 35 +++++++++
7 files changed, 177 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
index a6495e2..44c88c1 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
@@ -564,7 +564,7 @@ public class Message {
msgLen = (int)(headerLen + totalPartLen);
if (msgLen > MAX_MESSAGE_SIZE) {
- throw new MessageTooLargeException("Message size(" + msgLen
+ throw new MessageTooLargeException("Message size (" + msgLen
+ ") exceeds gemfire.client.max-message-size setting (" + MAX_MESSAGE_SIZE + ")");
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 86ecce1..51b125a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -38,12 +38,9 @@ import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionDestroyedException;
-import com.gemstone.gemfire.cache.client.internal.Connection;
-import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException;
import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.cache.BucketRegion;
import com.gemstone.gemfire.internal.cache.Conflatable;
import com.gemstone.gemfire.internal.cache.DistributedRegion;
@@ -143,6 +140,13 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
private volatile boolean resetLastPeekedEvents;
private long numEventsDispatched;
+
+ /**
+ * The batchSize is the batch size being used by this processor. By default, it is the
+ * configured batch size of the GatewaySender. It may be automatically reduced if a
+ * MessageTooLargeException occurs.
+ */
+ private int batchSize;
/**
* @param createThreadGroup
@@ -152,6 +156,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
String string, GatewaySender sender) {
super(createThreadGroup, string);
this.sender = (AbstractGatewaySender)sender;
+ this.batchSize = sender.getBatchSize();
}
abstract protected void initializeMessageQueue(String id);
@@ -214,6 +219,23 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
this.resetLastPeekedEvents = true;
}
+ protected int getBatchSize() {
+ return this.batchSize;
+ }
+
+ protected void setBatchSize(int batchSize) {
+ int currentBatchSize = this.batchSize;
+ if (batchSize <= 0) {
+ this.batchSize = 1;
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.AbstractGatewaySenderEventProcessor_ATTEMPT_TO_SET_BATCH_SIZE_FAILED, new Object[] { currentBatchSize, batchSize }));
+ } else {
+ this.batchSize = batchSize;
+ logger.info(LocalizedMessage.create(
+ LocalizedStrings.AbstractGatewaySenderEventProcessor_SET_BATCH_SIZE, new Object[] { currentBatchSize, this.batchSize }));
+ }
+ }
+
/**
* Returns the current batch id to be used to identify the next batch.
*
@@ -387,7 +409,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
final boolean isDebugEnabled = logger.isDebugEnabled();
final boolean isTraceEnabled = logger.isTraceEnabled();
- final int batchSize = sender.getBatchSize();
final int batchTimeInterval = sender.getBatchTimeInterval();
final GatewaySenderStats statistics = this.sender.getStatistics();
@@ -417,7 +438,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
// Peek a batch
if (isDebugEnabled) {
- logger.debug("Attempting to peek a batch of {} events", batchSize);
+ logger.debug("Attempting to peek a batch of {} events", this.batchSize);
}
for (;;) {
// check before sleeping
@@ -481,7 +502,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
}
}*/
}
- events = this.queue.peek(batchSize, batchTimeInterval);
+ events = this.queue.peek(this.batchSize, batchTimeInterval);
} catch (InterruptedException e) {
interrupted = true;
this.sender.getCancelCriterion().checkCancelInProgress(e);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index c00903f..a9d0f3e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -54,7 +54,6 @@ import com.gemstone.gemfire.cache.EvictionAttributes;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
-import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
@@ -143,7 +142,17 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
private static BatchRemovalThread removalThread = null;
protected BlockingQueue<GatewaySenderEventImpl> peekedEvents = new LinkedBlockingQueue<GatewaySenderEventImpl>();
-
+
+ /**
+ * The peekedEventsProcessing queue is used when the batch size is reduced due to a MessageTooLargeException
+ */
+ private BlockingQueue<GatewaySenderEventImpl> peekedEventsProcessing = new LinkedBlockingQueue<GatewaySenderEventImpl>();
+
+ /**
+ * The peekedEventsProcessingInProgress boolean denotes that processing existing peeked events is in progress
+ */
+ private boolean peekedEventsProcessingInProgress = false;
+
public final AbstractGatewaySender sender ;
public static final int WAIT_CYCLE_SHADOW_BUCKET_LOAD = 10;
@@ -1147,6 +1156,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
public void resetLastPeeked() {
this.resetLastPeeked = true;
+
+ // Reset the in progress boolean and queue for peeked events in progress
+ this.peekedEventsProcessingInProgress = false;
+ this.peekedEventsProcessing.clear();
}
// Need to improve here.If first peek returns NULL then look in another bucket.
@@ -1283,19 +1296,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
long start = System.currentTimeMillis();
long end = start + timeToWait;
- if (this.resetLastPeeked) {
- batch.addAll(peekedEvents);
- this.resetLastPeeked = false;
- if (isDebugEnabled) {
- StringBuffer buffer = new StringBuffer();
- for (GatewaySenderEventImpl ge : peekedEvents) {
- buffer.append("event :");
- buffer.append(ge);
- }
- logger.debug("Adding already peeked events to the batch {}", buffer);
- }
- }
-
+ // Add peeked events
+ addPeekedEvents(batch, batchSize);
+
int bId = -1;
while (batch.size() < batchSize) {
if (areLocalBucketQueueRegionsPresent()
@@ -1372,6 +1375,47 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
return batch;
}
+ private void addPeekedEvents(List batch, int batchSize) {
+ if (this.resetLastPeeked) {
+ if (this.peekedEventsProcessingInProgress) {
+ // Peeked event processing is in progress. This means that the original peekedEvents
+ // contained > batch size events due to a reduction in the batch size. Create a batch
+ // from the peekedEventsProcessing queue.
+ addPreviouslyPeekedEvents(batch, batchSize);
+ } else if (peekedEvents.size() <= batchSize) {
+ // This is the normal case. The connection was lost while processing a batch.
+ // This recreates the batch from the current peekedEvents.
+ batch.addAll(peekedEvents);
+ this.resetLastPeeked = false;
+ } else {
+ // The peekedEvents queue is > batch size. This means that the previous batch size was
+ // reduced due to MessageTooLargeException. Create a batch from the peekedEventsProcessing queue.
+ this.peekedEventsProcessing.addAll(this.peekedEvents);
+ this.peekedEventsProcessingInProgress = true;
+ addPreviouslyPeekedEvents(batch, batchSize);
+ }
+ if (logger.isDebugEnabled()) {
+ StringBuffer buffer = new StringBuffer();
+ for (Object ge : batch) {
+ buffer.append("event :");
+ buffer.append(ge);
+ }
+ logger.debug("Adding already peeked events to the batch {}", buffer);
+ }
+ }
+ }
+
+ private void addPreviouslyPeekedEvents(List batch, int batchSize) {
+ for (int i=0; i<batchSize; i++) {
+ batch.add(this.peekedEventsProcessing.remove());
+ if (this.peekedEventsProcessing.isEmpty()) {
+ this.resetLastPeeked = false;
+ this.peekedEventsProcessingInProgress = false;
+ break;
+ }
+ }
+ }
+
protected void blockProcesorThreadIfRequired() throws InterruptedException {
queueEmptyLock.lock();
try {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
index 8147718..3996692 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
@@ -2145,7 +2145,11 @@ public class LocalizedStrings extends ParentLocalizedStrings {
public static final StringId AUTH_FAILED_TO_ACQUIRE_AUTHINITIALIZE_INSTANCE = new StringId(6611, "AuthInitialize instance could not be obtained");
public static final StringId AUTH_FAILED_TO_OBTAIN_CREDENTIALS_IN_0_USING_AUTHINITIALIZE_1_2 = new StringId(6612, "Failed to obtain credentials using AuthInitialize [{1}]. {2}");
public static final StringId DistributedSystem_BACKUP_ALREADY_IN_PROGRESS = new StringId(6613, "A backup is already in progress.");
-
+
+ public static final StringId AbstractGatewaySenderEventProcessor_SET_BATCH_SIZE = new StringId(6614, "Set the batch size from {0} to {1} events");
+ public static final StringId AbstractGatewaySenderEventProcessor_ATTEMPT_TO_SET_BATCH_SIZE_FAILED = new StringId(6615, "Attempting to set the batch size from {0} to {1} events failed. Instead it was set to 1.");
+ public static final StringId GatewaySenderEventRemoteDispatcher_MESSAGE_TOO_LARGE_EXCEPTION = new StringId(6616, "The following exception occurred attempting to send a batch of {0} events. The batch will be tried again after reducing the batch size to {1} events.");
+
/** Testing strings, messageId 90000-99999 **/
/** These are simple messages for testing, translated with Babelfish. **/
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 9da6748..22dff3d 100644
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -23,6 +23,8 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import com.gemstone.gemfire.GemFireIOException;
+import com.gemstone.gemfire.internal.cache.tier.sockets.MessageTooLargeException;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.CancelException;
@@ -31,7 +33,6 @@ import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.client.ServerConnectivityException;
import com.gemstone.gemfire.cache.client.ServerOperationException;
import com.gemstone.gemfire.cache.client.internal.Connection;
-import com.gemstone.gemfire.cache.client.internal.ServerProxy;
import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException;
import com.gemstone.gemfire.cache.wan.GatewaySender;
import com.gemstone.gemfire.distributed.internal.ServerLocation;
@@ -151,7 +152,9 @@ public class GatewaySenderEventRemoteDispatcher implements
try {
long start = statistics.startTime();
success =_dispatchBatch(events, isRetry);
- statistics.endBatch(start, events.size());
+ if (success) {
+ statistics.endBatch(start, events.size());
+ }
} catch (GatewaySenderException ge) {
Throwable t = ge.getCause();
@@ -159,7 +162,8 @@ public class GatewaySenderEventRemoteDispatcher implements
// if our pool is shutdown then just be silent
} else if (t instanceof IOException
|| t instanceof ServerConnectivityException
- || t instanceof ConnectionDestroyedException) {
+ || t instanceof ConnectionDestroyedException
+ || t instanceof MessageTooLargeException) {
this.processor.handleException();
// If the cause is an IOException or a ServerException, sleep and retry.
// Sleep for a bit and recheck.
@@ -243,6 +247,27 @@ public class GatewaySenderEventRemoteDispatcher implements
LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(
new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex);
}
+ catch (GemFireIOException e) {
+ Throwable t = e.getCause();
+ if (t instanceof MessageTooLargeException) {
+ // A MessageTooLargeException has occurred.
+ // Do not process the connection as dead since it is not dead.
+ ex = (MessageTooLargeException)t;
+ // Reduce the batch size by half of the configured batch size or number of events in the current batch (whichever is less)
+ int newBatchSize = Math.min(events.size(), this.processor.getBatchSize())/2;
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.GatewaySenderEventRemoteDispatcher_MESSAGE_TOO_LARGE_EXCEPTION, new Object[] { events.size(), newBatchSize }), e);
+ this.processor.setBatchSize(newBatchSize);
+ }
+ else {
+ ex = e;
+ // keep using the connection if we had a MessageTooLargeException. Else, destroy it
+ destroyConnection();
+ }
+ throw new GatewaySenderException(
+ LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(
+ new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex);
+ }
catch (Exception e) {
// An Exception has occurred. Get its cause.
Throwable t = e.getCause();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
index 0a1a7ef..5da6b5c 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
@@ -2257,7 +2257,7 @@ public class WANTestBase extends DistributedTestCase{
public static void createSender(String dsName, int remoteDsId,
boolean isParallel, Integer maxMemory,
Integer batchSize, boolean isConflation, boolean isPersistent,
- GatewayEventFilter filter, boolean isManulaStart) {
+ GatewayEventFilter filter, boolean isManualStart) {
final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
try {
File persistentDirectory = new File(dsName + "_disk_"
@@ -2270,7 +2270,7 @@ public class WANTestBase extends DistributedTestCase{
gateway.setParallel(true);
gateway.setMaximumQueueMemory(maxMemory);
gateway.setBatchSize(batchSize);
- gateway.setManualStart(isManulaStart);
+ gateway.setManualStart(isManualStart);
//set dispatcher threads
gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
((InternalGatewaySenderFactory) gateway)
@@ -2294,7 +2294,7 @@ public class WANTestBase extends DistributedTestCase{
GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
gateway.setMaximumQueueMemory(maxMemory);
gateway.setBatchSize(batchSize);
- gateway.setManualStart(isManulaStart);
+ gateway.setManualStart(isManualStart);
//set dispatcher threads
gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
((InternalGatewaySenderFactory) gateway)
@@ -3056,7 +3056,25 @@ public class WANTestBase extends DistributedTestCase{
// r.destroy(i);
// }
}
-
+
+
+ public static void doPuts(String regionName, int numPuts, Object value) {
+ IgnoredException exp1 = IgnoredException.addIgnoredException(InterruptedException.class
+ .getName());
+ IgnoredException exp2 = IgnoredException.addIgnoredException(GatewaySenderException.class
+ .getName());
+ try {
+ Region r = cache.getRegion(Region.SEPARATOR + regionName);
+ assertNotNull(r);
+ for (long i = 0; i < numPuts; i++) {
+ r.put(i, value);
+ }
+ } finally {
+ exp1.remove();
+ exp2.remove();
+ }
+ }
+
public static void doPuts(String regionName, int numPuts) {
IgnoredException exp1 = IgnoredException.addIgnoredException(InterruptedException.class
.getName());
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index 9e1b28c..f929d89 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -16,8 +16,10 @@
*/
package com.gemstone.gemfire.internal.cache.wan.parallel;
+import com.gemstone.gemfire.GemFireIOException;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.internal.cache.tier.sockets.MessageTooLargeException;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
@@ -555,6 +557,39 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
vm7.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", true ));
}
+ public void testParallelGatewaySenderMessageTooLargeException() {
+ Integer[] locatorPorts = createLNAndNYLocators();
+ Integer lnPort = locatorPorts[0];
+ Integer nyPort = locatorPorts[1];
+
+ // Create and start sender with reduced maximum message size and 1 dispatcher thread
+ String regionName = getTestMethodName() + "_PR";
+ vm4.invoke(() -> setMaximumMessageSize( 1024*1024 ));
+ vm4.invoke(() -> createCache( lnPort ));
+ vm4.invoke(() -> setNumDispatcherThreadsForTheRun( 1 ));
+ vm4.invoke(() -> createSender( "ln", 2, true, 100, 100, false, false, null, false ));
+ vm4.invoke(() -> createPartitionedRegion( regionName, "ln", 0, 100, isOffHeap() ));
+
+ // Do puts
+ int numPuts = 200;
+ vm4.invoke(() -> doPuts( regionName, numPuts, new byte[11000] ));
+ validateRegionSizes(regionName, numPuts, vm4);
+
+ // Start receiver
+ IgnoredException ignoredMTLE = IgnoredException.addIgnoredException(MessageTooLargeException.class.getName(), vm4);
+ IgnoredException ignoredGIOE = IgnoredException.addIgnoredException(GemFireIOException.class.getName(), vm4);
+ vm2.invoke(() -> createReceiver( nyPort ));
+ vm2.invoke(() -> createPartitionedRegion( regionName, null, 0, 100, isOffHeap() ));
+ validateRegionSizes( regionName, numPuts, vm2 );
+ ignoredMTLE.remove();
+ ignoredGIOE.remove();
+ }
+
+ private void setMaximumMessageSize(int maximumMessageSizeBytes) {
+ System.setProperty("gemfire.client.max-message-size", String.valueOf(maximumMessageSizeBytes));
+ LogWriterUtils.getLogWriter().info("Set gemfire.client.max-message-size: " + System.getProperty("gemfire.client.max-message-size"));
+ }
+
private void createSendersReceiversAndPartitionedRegion(Integer lnPort, Integer nyPort, boolean createAccessors,
boolean startSenders) {
// Note: This is a test-specific method used by several test to create