You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2019/04/12 22:14:08 UTC

[geode] 01/07: GEODE-6626: Make GatewayReceiver to use a single instance

This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch GEODE-6626-gatewayReceiver-metrics
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 433c10eef8d3cd2aa4d4e618f89bf708f1915ef4
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Fri Apr 12 14:49:14 2019 -0700

    GEODE-6626: Make GatewayReceiver to use a single instance
    
    Tests and code required there be only one GatewayReceiver,
    so we refactored the code to no longer use a set. Instead we have
    a single atomic reference.
    
    Co-authored-by: Mark Hanson <mh...@pivotal.io>
---
 .../geode/internal/cache/GemFireCacheImpl.java     | 38 +++++-----------------
 .../geode/internal/cache/GemFireCacheImplTest.java | 19 +++++++++--
 2 files changed, 26 insertions(+), 31 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 1e4fcda..456846e 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -68,6 +68,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiPredicate;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
@@ -420,18 +421,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
    */
   private final Set<AsyncEventQueue> allAsyncEventQueues = new CopyOnWriteArraySet<>();
 
-  /**
-   * Controls updates to the list of all gateway receivers
-   *
-   * @see #allGatewayReceivers
-   */
-  private final Object allGatewayReceiversLock = new Object();
-
-  /**
-   * the list of all gateway Receivers. It may be fetched safely (for enumeration), but updates must
-   * by synchronized via {@link #allGatewayReceiversLock}
-   */
-  private volatile Set<GatewayReceiver> allGatewayReceivers = Collections.emptySet();
+  private final AtomicReference<GatewayReceiver> gatewayReceiver = new AtomicReference<>();
 
   /**
    * PartitionedRegion instances (for required-events notification
@@ -3800,28 +3790,14 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
   public void addGatewayReceiver(GatewayReceiver receiver) {
     throwIfClient();
     this.stopper.checkCancelInProgress(null);
-    synchronized (this.allGatewayReceiversLock) {
-      Set<GatewayReceiver> newReceivers = new HashSet<>(this.allGatewayReceivers.size() + 1);
-      if (!this.allGatewayReceivers.isEmpty()) {
-        newReceivers.addAll(this.allGatewayReceivers);
-      }
-      newReceivers.add(receiver);
-      this.allGatewayReceivers = Collections.unmodifiableSet(newReceivers);
-    }
+    gatewayReceiver.set(receiver);
   }
 
   @Override
   public void removeGatewayReceiver(GatewayReceiver receiver) {
     throwIfClient();
     this.stopper.checkCancelInProgress(null);
-    synchronized (this.allGatewayReceiversLock) {
-      Set<GatewayReceiver> newReceivers = new HashSet<>(this.allGatewayReceivers.size() + 1);
-      if (!this.allGatewayReceivers.isEmpty()) {
-        newReceivers.addAll(this.allGatewayReceivers);
-      }
-      newReceivers.remove(receiver);
-      this.allGatewayReceivers = Collections.unmodifiableSet(newReceivers);
-    }
+    gatewayReceiver.set(null);
   }
 
   @Override
@@ -3871,7 +3847,11 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
 
   @Override
   public Set<GatewayReceiver> getGatewayReceivers() {
-    return this.allGatewayReceivers;
+    GatewayReceiver receiver = gatewayReceiver.get();
+    if (receiver == null) {
+      return Collections.emptySet();
+    }
+    return Collections.singleton(receiver);
   }
 
   @Override
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
index 71d982d..9bf8b6a 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
@@ -249,8 +249,24 @@ public class GemFireCacheImplTest {
   }
 
   @Test
-  public void removeGatewayReceiverShouldRemoveFromReceiversList() {
+  public void addGatewayReceiverDoesNotAllowMoreThanOneReceiver() {
     GatewayReceiver receiver = mock(GatewayReceiver.class);
+    GatewayReceiver receiver2 = mock(GatewayReceiver.class);
+
+    gemFireCacheImpl = createGemFireCacheImpl();
+    gemFireCacheImpl.addGatewayReceiver(receiver);
+    assertEquals(1, gemFireCacheImpl.getGatewayReceivers().size());
+
+    gemFireCacheImpl.addGatewayReceiver(receiver2);
+
+    assertEquals(1, gemFireCacheImpl.getGatewayReceivers().size());
+    assertThat(gemFireCacheImpl.getGatewayReceivers()).containsOnly(receiver2);
+  }
+
+  @Test
+  public void removeGatewayReceiverRemovesTheReceiver() {
+    GatewayReceiver receiver = mock(GatewayReceiver.class);
+
     gemFireCacheImpl = createGemFireCacheImpl();
     gemFireCacheImpl.addGatewayReceiver(receiver);
     assertEquals(1, gemFireCacheImpl.getGatewayReceivers().size());
@@ -260,7 +276,6 @@ public class GemFireCacheImplTest {
     assertEquals(0, gemFireCacheImpl.getGatewayReceivers().size());
   }
 
-
   @Test
   public void removeFromCacheServerShouldRemoveFromCacheServersList() {
     gemFireCacheImpl = createGemFireCacheImpl();