You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2016/12/15 18:35:15 UTC

geode git commit: GEODE-2209: Added support for up to 128 GatewaySenders

Repository: geode
Updated Branches:
  refs/heads/develop 41774b3f6 -> b0fbf72f0


GEODE-2209: Added support for up to 128 GatewaySenders


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/b0fbf72f
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/b0fbf72f
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/b0fbf72f

Branch: refs/heads/develop
Commit: b0fbf72f043bd73a94a6d4df36a44bfef91efce8
Parents: 41774b3
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Tue Dec 13 17:30:23 2016 -0800
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Thu Dec 15 10:29:02 2016 -0800

----------------------------------------------------------------------
 .../internal/cache/ha/ThreadIdentifier.java     |  9 +-
 .../cache/wan/AbstractGatewaySender.java        |  6 ++
 .../geode/internal/i18n/LocalizedStrings.java   |  4 +
 .../geode/internal/cache/wan/QueueListener.java |  4 +
 .../SerialGatewaySenderQueueDUnitTest.java      | 90 ++++++++++++++++++++
 5 files changed, 109 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/b0fbf72f/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java
index 242510a..ec165a5 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java
@@ -89,12 +89,12 @@ public class ThreadIdentifier implements DataSerializable {
    * Provides type-safe bitwise access to the threadID when dealing with generated values for wan id
    * generation.
    */
-  protected enum Bits {
+  public enum Bits {
     THREAD_ID(0, 32), // bits 0-31 thread id (including fake putAll bits)
     WAN(32, 16), // bits 32-47 wan thread index (or bucket for new wan)
     WAN_TYPE(48, 8), // bits 48-55 thread id type
-    GATEWAY_ID(56, 4), // bits 56-59 gateway id
-    RESERVED(60, 4); // bits 60-63 unused
+    GATEWAY_ID(56, 7), // bits 56-62 gateway id (bit 63 would make the thread id negative)
+    RESERVED(63, 1); // bit 63 unused
 
     /** the beginning bit position */
     private final int position;
@@ -123,7 +123,8 @@ public class ThreadIdentifier implements DataSerializable {
      * @return the shifted value
      */
     public long shift(long val) {
-      assert val <= mask();
+      assert val <= mask() : "Input value " + val + " is too large for " + this
+          + " which has a maximum of " + mask();
       return val << position;
     }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/b0fbf72f/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index c4d503e..2c9a65d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelCriterion;
@@ -1117,6 +1118,11 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
           }
         } else {
           index = region.size();
+          if (index > ThreadIdentifier.Bits.GATEWAY_ID.mask()) {
+            throw new IllegalStateException(
+                LocalizedStrings.AbstractGatewaySender_CANNOT_CREATE_SENDER_0_BECAUSE_MAXIMUM_1_HAS_BEEN_REACHED
+                    .toLocalizedString(getId(), ThreadIdentifier.Bits.GATEWAY_ID.mask() + 1));
+          }
           region.put(getId(), index);
           if (isDebugEnabled) {
             messagePrefix = "Created new";

http://git-wip-us.apache.org/repos/asf/geode/blob/b0fbf72f/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
index 48b8a14..26a41ec 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
@@ -7661,6 +7661,10 @@ public class LocalizedStrings {
       new StringId(6646,
           "An unexpected exception occurred processing a BatchException. The thread will continue.");
 
+  public static final StringId AbstractGatewaySender_CANNOT_CREATE_SENDER_0_BECAUSE_MAXIMUM_1_HAS_BEEN_REACHED =
+      new StringId(6647,
+          "Cannot create GatewaySender {0} because the maximum ({1}) has been reached");
+
   /** Testing strings, messageId 90000-99999 **/
 
   /**

http://git-wip-us.apache.org/repos/asf/geode/blob/b0fbf72f/geode-core/src/test/java/org/apache/geode/internal/cache/wan/QueueListener.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/QueueListener.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/QueueListener.java
index 32032de..34b710d 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/QueueListener.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/QueueListener.java
@@ -69,6 +69,10 @@ public class QueueListener implements CacheListener {
     updateList.add(event.getKey());
   }
 
+  public int getNumEvents() {
+    return this.createList.size() + this.updateList.size() + this.destroyList.size();
+  }
+
   public void close() {
     // TODO Auto-generated method stub
 

http://git-wip-us.apache.org/repos/asf/geode/blob/b0fbf72f/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
index 34acfd4..7417c80 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
@@ -22,7 +22,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
+import com.jayway.awaitility.Awaitility;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -41,6 +43,7 @@ import org.apache.geode.cache30.MyGatewayEventFilter1;
 import org.apache.geode.cache30.MyGatewayTransportFilter1;
 import org.apache.geode.cache30.MyGatewayTransportFilter2;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.WANTestBase;
@@ -315,4 +318,91 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase {
     }
   }
 
+  /**
+   * Test to validate that the maximum number of senders can be created and used successfully.
+   */
+  @Test
+  public void testCreateMaximumSenders() {
+    // Create locators
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    // Create receiver and region
+    vm2.invoke(() -> WANTestBase.createCache(nyPort));
+    vm2.invoke(
+        () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, isOffHeap()));
+    vm2.invoke(() -> WANTestBase.createReceiver());
+    vm2.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_RR"));
+
+    // Create maximum number of senders
+    vm4.invoke(() -> WANTestBase.createCache(lnPort));
+    StringBuilder builder = new StringBuilder();
+    long maxSenders = ThreadIdentifier.Bits.GATEWAY_ID.mask() + 1;
+    for (int i = 0; i < maxSenders; i++) {
+      String senderId = "ln-" + i;
+      builder.append(senderId);
+      if (i + 1 != maxSenders) {
+        builder.append(',');
+      }
+      vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers(senderId, 2, false, 100, 10,
+          false, false, null, false, 1, OrderPolicy.KEY));
+    }
+
+    // Create region with the sender ids
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR",
+        builder.toString(), isOffHeap()));
+
+    // Do puts
+    int numPuts = 100;
+    vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", numPuts));
+
+    // Verify receiver listener events
+    vm2.invoke(() -> SerialGatewaySenderQueueDUnitTest.verifyListenerEvents(maxSenders * numPuts));
+  }
+
+  private static void verifyListenerEvents(final long expectedNumEvents) {
+    Awaitility.await().atMost(60, TimeUnit.SECONDS)
+        .until(() -> listener1.getNumEvents() == expectedNumEvents);
+  }
+
+  /**
+   * Test to validate that the maximum number of senders plus one fails to be created.
+   */
+  @Test
+  public void testCreateMaximumPlusOneSenders() {
+    // Create locators
+    Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    // Create receiver
+    vm2.invoke(() -> WANTestBase.createCache(nyPort));
+    vm2.invoke(
+        () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, isOffHeap()));
+    vm2.invoke(() -> WANTestBase.createReceiver());
+
+    // Create maximum number of senders
+    vm4.invoke(() -> WANTestBase.createCache(lnPort));
+    for (int i = 0; i < ThreadIdentifier.Bits.GATEWAY_ID.mask() + 1; i++) {
+      String senderId = "ln-" + i;
+      vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers(senderId, 2, false, 100, 10,
+          false, false, null, false, 1, OrderPolicy.KEY));
+    }
+
+    // Attempt to create one more sender
+    vm4.invoke(() -> SerialGatewaySenderQueueDUnitTest.attemptToCreateGatewaySenderOverLimit());
+  }
+
+  private static void attemptToCreateGatewaySenderOverLimit() {
+    IgnoredException exp =
+        IgnoredException.addIgnoredException(IllegalStateException.class.getName());
+    try {
+      createSenderWithMultipleDispatchers("ln-one-too-many", 2, false, 100, 10, false, false, null,
+          false, 1, OrderPolicy.KEY);
+      fail("Should not have been able to create gateway sender");
+    } catch (IllegalStateException e) {
+      /* ignore expected exception */
+    } finally {
+      exp.remove();
+    }
+  }
 }