You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2019/02/25 23:56:43 UTC

[geode] branch develop updated: GEODE-6435: Reduced gateway sender socket buffer size

This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 461bb48  GEODE-6435: Reduced gateway sender socket buffer size
461bb48 is described below

commit 461bb483b7eba2127eef9e690aae9a56c2dc5071
Author: Barry Oglesby <bo...@users.noreply.github.com>
AuthorDate: Mon Feb 25 15:56:21 2019 -0800

    GEODE-6435: Reduced gateway sender socket buffer size
---
 .../geode/internal/cache/wan/WANTestBase.java       | 21 +++++++++++++++++----
 .../serial/SerialGatewaySenderQueueDUnitTest.java   |  6 +++---
 2 files changed, 20 insertions(+), 7 deletions(-)

diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 70fbe1c..1512441 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -1679,7 +1679,7 @@ public class WANTestBase extends DistributedTestCase {
   public static GatewaySenderFactory configureGateway(DiskStoreFactory dsf, File[] dirs1,
       String dsName, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation,
       boolean isPersistent, GatewayEventFilter filter, boolean isManualStart, int numDispatchers,
-      OrderPolicy policy) {
+      OrderPolicy policy, int socketBufferSize) {
 
     InternalGatewaySenderFactory gateway =
         (InternalGatewaySenderFactory) cache.createGatewaySenderFactory();
@@ -1691,6 +1691,7 @@ public class WANTestBase extends DistributedTestCase {
     gateway.setDispatcherThreads(numDispatchers);
     gateway.setOrderPolicy(policy);
     gateway.setLocatorDiscoveryCallback(new MyLocatorCallback());
+    gateway.setSocketBufferSize(socketBufferSize);
     if (filter != null) {
       eventFilter = filter;
       gateway.addGatewayEventFilter(filter);
@@ -1717,7 +1718,8 @@ public class WANTestBase extends DistributedTestCase {
       File[] dirs1 = new File[] {persistentDirectory};
       GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, isParallel, maxMemory,
           batchSize, isConflation, isPersistent, filter, isManualStart,
-          numDispatcherThreadsForTheRun, GatewaySender.DEFAULT_ORDER_POLICY);
+          numDispatcherThreadsForTheRun, GatewaySender.DEFAULT_ORDER_POLICY,
+          GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE);
       gateway.create(dsName, remoteDsId);
 
     } finally {
@@ -1729,6 +1731,16 @@ public class WANTestBase extends DistributedTestCase {
       boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation,
       boolean isPersistent, GatewayEventFilter filter, boolean isManualStart, int numDispatchers,
       OrderPolicy orderPolicy) {
+    createSenderWithMultipleDispatchers(dsName, remoteDsId,
+        isParallel, maxMemory, batchSize, isConflation,
+        isPersistent, filter, isManualStart, numDispatchers,
+        orderPolicy, GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE);
+  }
+
+  public static void createSenderWithMultipleDispatchers(String dsName, int remoteDsId,
+      boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation,
+      boolean isPersistent, GatewayEventFilter filter, boolean isManualStart, int numDispatchers,
+      OrderPolicy orderPolicy, int socketBufferSize) {
     final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
     try {
       File persistentDirectory =
@@ -1738,7 +1750,7 @@ public class WANTestBase extends DistributedTestCase {
       File[] dirs1 = new File[] {persistentDirectory};
       GatewaySenderFactory gateway =
           configureGateway(dsf, dirs1, dsName, isParallel, maxMemory, batchSize, isConflation,
-              isPersistent, filter, isManualStart, numDispatchers, orderPolicy);
+              isPersistent, filter, isManualStart, numDispatchers, orderPolicy, socketBufferSize);
       gateway.create(dsName, remoteDsId);
 
     } finally {
@@ -1770,7 +1782,8 @@ public class WANTestBase extends DistributedTestCase {
     DiskStoreFactory dsf = cache.createDiskStoreFactory();
     File[] dirs1 = new File[] {persistentDirectory};
     GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, isParallel, maxMemory,
-        batchSize, isConflation, isPersistent, filter, isManualStart, concurrencyLevel, policy);
+        batchSize, isConflation, isPersistent, filter, isManualStart, concurrencyLevel, policy,
+        GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE);
     gateway.create(dsName, remoteDsId);
   }
 
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
index e5359f8..01e05fc 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
@@ -355,7 +355,7 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase {
         builder.append(',');
       }
       vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers(senderId, 2, false, 100, 10,
-          false, false, null, false, 1, OrderPolicy.KEY));
+          false, false, null, false, 1, OrderPolicy.KEY, 32768));
     }
 
     // Create region with the sender ids
@@ -390,7 +390,7 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase {
     for (int i = 0; i < ThreadIdentifier.Bits.GATEWAY_ID.mask() + 1; i++) {
       String senderId = "ln-" + i;
       vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers(senderId, 2, false, 100, 10,
-          false, false, null, false, 1, OrderPolicy.KEY));
+          false, false, null, false, 1, OrderPolicy.KEY, 32768));
     }
 
     // Attempt to create one more sender
@@ -402,7 +402,7 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase {
         IgnoredException.addIgnoredException(IllegalStateException.class.getName());
     try {
       createSenderWithMultipleDispatchers("ln-one-too-many", 2, false, 100, 10, false, false, null,
-          false, 1, OrderPolicy.KEY);
+          false, 1, OrderPolicy.KEY, 32768);
       fail("Should not have been able to create gateway sender");
     } catch (IllegalStateException e) {
       /* ignore expected exception */