You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2016/12/15 18:35:15 UTC
geode git commit: GEODE-2209: Added support for up to 128
GatewaySenders
Repository: geode
Updated Branches:
refs/heads/develop 41774b3f6 -> b0fbf72f0
GEODE-2209: Added support for up to 128 GatewaySenders
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/b0fbf72f
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/b0fbf72f
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/b0fbf72f
Branch: refs/heads/develop
Commit: b0fbf72f043bd73a94a6d4df36a44bfef91efce8
Parents: 41774b3
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Tue Dec 13 17:30:23 2016 -0800
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Thu Dec 15 10:29:02 2016 -0800
----------------------------------------------------------------------
.../internal/cache/ha/ThreadIdentifier.java | 9 +-
.../cache/wan/AbstractGatewaySender.java | 6 ++
.../geode/internal/i18n/LocalizedStrings.java | 4 +
.../geode/internal/cache/wan/QueueListener.java | 4 +
.../SerialGatewaySenderQueueDUnitTest.java | 90 ++++++++++++++++++++
5 files changed, 109 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/b0fbf72f/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java
index 242510a..ec165a5 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/ThreadIdentifier.java
@@ -89,12 +89,12 @@ public class ThreadIdentifier implements DataSerializable {
* Provides type-safe bitwise access to the threadID when dealing with generated values for wan id
* generation.
*/
- protected enum Bits {
+ public enum Bits {
THREAD_ID(0, 32), // bits 0-31 thread id (including fake putAll bits)
WAN(32, 16), // bits 32-47 wan thread index (or bucket for new wan)
WAN_TYPE(48, 8), // bits 48-55 thread id type
- GATEWAY_ID(56, 4), // bits 56-59 gateway id
- RESERVED(60, 4); // bits 60-63 unused
+ GATEWAY_ID(56, 7), // bits 56-62 gateway id (bit 63 would make the thread id negative)
+ RESERVED(63, 1); // bit 63 unused
/** the beginning bit position */
private final int position;
@@ -123,7 +123,8 @@ public class ThreadIdentifier implements DataSerializable {
* @return the shifted value
*/
public long shift(long val) {
- assert val <= mask();
+ assert val <= mask() : "Input value " + val + " is too large for " + this
+ + " which has a maximum of " + mask();
return val << position;
}
http://git-wip-us.apache.org/repos/asf/geode/blob/b0fbf72f/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index c4d503e..2c9a65d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -24,6 +24,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelCriterion;
@@ -1117,6 +1118,11 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
}
} else {
index = region.size();
+ if (index > ThreadIdentifier.Bits.GATEWAY_ID.mask()) {
+ throw new IllegalStateException(
+ LocalizedStrings.AbstractGatewaySender_CANNOT_CREATE_SENDER_0_BECAUSE_MAXIMUM_1_HAS_BEEN_REACHED
+ .toLocalizedString(getId(), ThreadIdentifier.Bits.GATEWAY_ID.mask() + 1));
+ }
region.put(getId(), index);
if (isDebugEnabled) {
messagePrefix = "Created new";
http://git-wip-us.apache.org/repos/asf/geode/blob/b0fbf72f/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
index 48b8a14..26a41ec 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
@@ -7661,6 +7661,10 @@ public class LocalizedStrings {
new StringId(6646,
"An unexpected exception occurred processing a BatchException. The thread will continue.");
+ public static final StringId AbstractGatewaySender_CANNOT_CREATE_SENDER_0_BECAUSE_MAXIMUM_1_HAS_BEEN_REACHED =
+ new StringId(6647,
+ "Cannot create GatewaySender {0} because the maximum ({1}) has been reached");
+
/** Testing strings, messageId 90000-99999 **/
/**
http://git-wip-us.apache.org/repos/asf/geode/blob/b0fbf72f/geode-core/src/test/java/org/apache/geode/internal/cache/wan/QueueListener.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/QueueListener.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/QueueListener.java
index 32032de..34b710d 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/QueueListener.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/QueueListener.java
@@ -69,6 +69,10 @@ public class QueueListener implements CacheListener {
updateList.add(event.getKey());
}
+ public int getNumEvents() {
+ return this.createList.size() + this.updateList.size() + this.destroyList.size();
+ }
+
public void close() {
// TODO Auto-generated method stub
http://git-wip-us.apache.org/repos/asf/geode/blob/b0fbf72f/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
index 34acfd4..7417c80 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
@@ -22,7 +22,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import com.jayway.awaitility.Awaitility;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -41,6 +43,7 @@ import org.apache.geode.cache30.MyGatewayEventFilter1;
import org.apache.geode.cache30.MyGatewayTransportFilter1;
import org.apache.geode.cache30.MyGatewayTransportFilter2;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.WANTestBase;
@@ -315,4 +318,91 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase {
}
}
+ /**
+ * Test to validate that the maximum number of senders can be created and used successfully.
+ */
+ @Test
+ public void testCreateMaximumSenders() {
+ // Create locators
+ Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ // Create receiver and region
+ vm2.invoke(() -> WANTestBase.createCache(nyPort));
+ vm2.invoke(
+ () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, isOffHeap()));
+ vm2.invoke(() -> WANTestBase.createReceiver());
+ vm2.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_RR"));
+
+ // Create maximum number of senders
+ vm4.invoke(() -> WANTestBase.createCache(lnPort));
+ StringBuilder builder = new StringBuilder();
+ long maxSenders = ThreadIdentifier.Bits.GATEWAY_ID.mask() + 1;
+ for (int i = 0; i < maxSenders; i++) {
+ String senderId = "ln-" + i;
+ builder.append(senderId);
+ if (i + 1 != maxSenders) {
+ builder.append(',');
+ }
+ vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers(senderId, 2, false, 100, 10,
+ false, false, null, false, 1, OrderPolicy.KEY));
+ }
+
+ // Create region with the sender ids
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR",
+ builder.toString(), isOffHeap()));
+
+ // Do puts
+ int numPuts = 100;
+ vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", numPuts));
+
+ // Verify receiver listener events
+ vm2.invoke(() -> SerialGatewaySenderQueueDUnitTest.verifyListenerEvents(maxSenders * numPuts));
+ }
+
+ private static void verifyListenerEvents(final long expectedNumEvents) {
+ Awaitility.await().atMost(60, TimeUnit.SECONDS)
+ .until(() -> listener1.getNumEvents() == expectedNumEvents);
+ }
+
+ /**
+ * Test to validate that the maximum number of senders plus one fails to be created.
+ */
+ @Test
+ public void testCreateMaximumPlusOneSenders() {
+ // Create locators
+ Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ // Create receiver
+ vm2.invoke(() -> WANTestBase.createCache(nyPort));
+ vm2.invoke(
+ () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, isOffHeap()));
+ vm2.invoke(() -> WANTestBase.createReceiver());
+
+ // Create maximum number of senders
+ vm4.invoke(() -> WANTestBase.createCache(lnPort));
+ for (int i = 0; i < ThreadIdentifier.Bits.GATEWAY_ID.mask() + 1; i++) {
+ String senderId = "ln-" + i;
+ vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers(senderId, 2, false, 100, 10,
+ false, false, null, false, 1, OrderPolicy.KEY));
+ }
+
+ // Attempt to create one more sender
+ vm4.invoke(() -> SerialGatewaySenderQueueDUnitTest.attemptToCreateGatewaySenderOverLimit());
+ }
+
+ private static void attemptToCreateGatewaySenderOverLimit() {
+ IgnoredException exp =
+ IgnoredException.addIgnoredException(IllegalStateException.class.getName());
+ try {
+ createSenderWithMultipleDispatchers("ln-one-too-many", 2, false, 100, 10, false, false, null,
+ false, 1, OrderPolicy.KEY);
+ fail("Should not have been able to create gateway sender");
+ } catch (IllegalStateException e) {
+ /* ignore expected exception */
+ } finally {
+ exp.remove();
+ }
+ }
}