You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2019/04/12 22:14:08 UTC
[geode] 01/07: GEODE-6626: Make GatewayReceiver to use a single
instance
This is an automated email from the ASF dual-hosted git repository.
klund pushed a commit to branch GEODE-6626-gatewayReceiver-metrics
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 433c10eef8d3cd2aa4d4e618f89bf708f1915ef4
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Fri Apr 12 14:49:14 2019 -0700
GEODE-6626: Make GatewayReceiver to use a single instance
Tests and code required there be only one GatewayReceiver,
so we refactored the code to no longer use a set. Instead we have
a single atomic reference.
Co-authored-by: Mark Hanson <mh...@pivotal.io>
---
.../geode/internal/cache/GemFireCacheImpl.java | 38 +++++-----------------
.../geode/internal/cache/GemFireCacheImplTest.java | 19 +++++++++--
2 files changed, 26 insertions(+), 31 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 1e4fcda..456846e 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -68,6 +68,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
@@ -420,18 +421,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
*/
private final Set<AsyncEventQueue> allAsyncEventQueues = new CopyOnWriteArraySet<>();
- /**
- * Controls updates to the list of all gateway receivers
- *
- * @see #allGatewayReceivers
- */
- private final Object allGatewayReceiversLock = new Object();
-
- /**
- * the list of all gateway Receivers. It may be fetched safely (for enumeration), but updates must
- * by synchronized via {@link #allGatewayReceiversLock}
- */
- private volatile Set<GatewayReceiver> allGatewayReceivers = Collections.emptySet();
+ private final AtomicReference<GatewayReceiver> gatewayReceiver = new AtomicReference<>();
/**
* PartitionedRegion instances (for required-events notification
@@ -3800,28 +3790,14 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
public void addGatewayReceiver(GatewayReceiver receiver) {
throwIfClient();
this.stopper.checkCancelInProgress(null);
- synchronized (this.allGatewayReceiversLock) {
- Set<GatewayReceiver> newReceivers = new HashSet<>(this.allGatewayReceivers.size() + 1);
- if (!this.allGatewayReceivers.isEmpty()) {
- newReceivers.addAll(this.allGatewayReceivers);
- }
- newReceivers.add(receiver);
- this.allGatewayReceivers = Collections.unmodifiableSet(newReceivers);
- }
+ gatewayReceiver.set(receiver);
}
@Override
public void removeGatewayReceiver(GatewayReceiver receiver) {
throwIfClient();
this.stopper.checkCancelInProgress(null);
- synchronized (this.allGatewayReceiversLock) {
- Set<GatewayReceiver> newReceivers = new HashSet<>(this.allGatewayReceivers.size() + 1);
- if (!this.allGatewayReceivers.isEmpty()) {
- newReceivers.addAll(this.allGatewayReceivers);
- }
- newReceivers.remove(receiver);
- this.allGatewayReceivers = Collections.unmodifiableSet(newReceivers);
- }
+ gatewayReceiver.set(null);
}
@Override
@@ -3871,7 +3847,11 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
@Override
public Set<GatewayReceiver> getGatewayReceivers() {
- return this.allGatewayReceivers;
+ GatewayReceiver receiver = gatewayReceiver.get();
+ if (receiver == null) {
+ return Collections.emptySet();
+ }
+ return Collections.singleton(receiver);
}
@Override
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
index 71d982d..9bf8b6a 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
@@ -249,8 +249,24 @@ public class GemFireCacheImplTest {
}
@Test
- public void removeGatewayReceiverShouldRemoveFromReceiversList() {
+ public void addGatewayReceiverDoesNotAllowMoreThanOneReceiver() {
GatewayReceiver receiver = mock(GatewayReceiver.class);
+ GatewayReceiver receiver2 = mock(GatewayReceiver.class);
+
+ gemFireCacheImpl = createGemFireCacheImpl();
+ gemFireCacheImpl.addGatewayReceiver(receiver);
+ assertEquals(1, gemFireCacheImpl.getGatewayReceivers().size());
+
+ gemFireCacheImpl.addGatewayReceiver(receiver2);
+
+ assertEquals(1, gemFireCacheImpl.getGatewayReceivers().size());
+ assertThat(gemFireCacheImpl.getGatewayReceivers()).containsOnly(receiver2);
+ }
+
+ @Test
+ public void removeGatewayReceiverRemovesTheReceiver() {
+ GatewayReceiver receiver = mock(GatewayReceiver.class);
+
gemFireCacheImpl = createGemFireCacheImpl();
gemFireCacheImpl.addGatewayReceiver(receiver);
assertEquals(1, gemFireCacheImpl.getGatewayReceivers().size());
@@ -260,7 +276,6 @@ public class GemFireCacheImplTest {
assertEquals(0, gemFireCacheImpl.getGatewayReceivers().size());
}
-
@Test
public void removeFromCacheServerShouldRemoveFromCacheServersList() {
gemFireCacheImpl = createGemFireCacheImpl();