You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2019/04/12 22:14:07 UTC

[geode] branch GEODE-6626-gatewayReceiver-metrics created (now 2e0b5b1)

This is an automated email from the ASF dual-hosted git repository.

klund pushed a change to branch GEODE-6626-gatewayReceiver-metrics
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at 2e0b5b1  wip

This branch includes the following new commits:

     new 433c10e  GEODE-6626: Make GatewayReceiver to use a single instance
     new a142644  GEODE-6626: Cleanup GatewayReceiverImpl and its unit test
     new beb5373  GEODE-6626: Expand GatewayReceiverFactoryImplTest
     new 7bc3864  GEODE-6626: Make GatewayReceiverCommand unit testable
     new 4d23e7e  GEODE-6626: Remove unused method from GatewayReceiverMBean
     new f6af2e5  GEODE-6626: Fix getStartPort in GatewayReceiverMBeanBridge
     new 2e0b5b1  wip

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 02/07: GEODE-6626: Cleanup GatewayReceiverImpl and its unit test

Posted by kl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch GEODE-6626-gatewayReceiver-metrics
in repository https://gitbox.apache.org/repos/asf/geode.git

commit a1426440f845b9a037d95373cb7787acf30cde60
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Fri Apr 12 15:04:06 2019 -0700

    GEODE-6626: Cleanup GatewayReceiverImpl and its unit test
    
    Co-authored-by: Mark Hanson <mh...@pivotal.io>
---
 .../internal/cache/wan/GatewayReceiverImpl.java    | 104 ++++++++++-----------
 ...JUnitTest.java => GatewayReceiverImplTest.java} |  10 +-
 2 files changed, 51 insertions(+), 63 deletions(-)

diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
index fcdd011..757fe2b 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
@@ -14,6 +14,10 @@
  */
 package org.apache.geode.internal.cache.wan;
 
+import static org.apache.geode.internal.AvailablePort.SOCKET;
+import static org.apache.geode.internal.AvailablePort.getAddress;
+import static org.apache.geode.internal.AvailablePort.getRandomAvailablePortInRange;
+
 import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.Arrays;
@@ -40,38 +44,28 @@ public class GatewayReceiverImpl implements GatewayReceiver {
 
   private static final Logger logger = LogService.getLogger();
 
-  private String hostnameForSenders;
-
-  private int startPort;
-
-  private int endPort;
-
-  private int port;
-
-  private int timeBetPings;
-
-  private int socketBufferSize;
-
-  private boolean manualStart;
-
+  private final InternalCache cache;
+  private final String hostnameForSenders;
+  private final int startPort;
+  private final int endPort;
+  private final int timeBetPings;
+  private final int socketBufferSize;
+  private final boolean manualStart;
   private final List<GatewayTransportFilter> filters;
-
-  private String bindAdd;
+  private final String bindAdd;
 
   private CacheServer receiver;
+  private int port;
 
-  private final InternalCache cache;
-
-  public GatewayReceiverImpl(InternalCache cache, int startPort, int endPort, int timeBetPings,
+  GatewayReceiverImpl(InternalCache cache, int startPort, int endPort, int timeBetPings,
       int buffSize, String bindAdd, List<GatewayTransportFilter> filters, String hostnameForSenders,
       boolean manualStart) {
     this.cache = cache;
-
     this.hostnameForSenders = hostnameForSenders;
     this.startPort = startPort;
     this.endPort = endPort;
     this.timeBetPings = timeBetPings;
-    this.socketBufferSize = buffSize;
+    socketBufferSize = buffSize;
     this.bindAdd = bindAdd;
     this.filters = filters;
     this.manualStart = manualStart;
@@ -99,44 +93,43 @@ public class GatewayReceiverImpl implements GatewayReceiver {
     try {
       return SocketCreator.getLocalHost().getHostName();
     } catch (UnknownHostException e) {
-      throw new IllegalStateException(
-          "Could not get host name", e);
+      throw new IllegalStateException("Could not get host name", e);
     }
   }
 
   @Override
   public List<GatewayTransportFilter> getGatewayTransportFilters() {
-    return this.filters;
+    return filters;
   }
 
   @Override
   public int getMaximumTimeBetweenPings() {
-    return this.timeBetPings;
+    return timeBetPings;
   }
 
   @Override
   public int getPort() {
-    return this.port;
+    return port;
   }
 
   @Override
   public int getStartPort() {
-    return this.startPort;
+    return startPort;
   }
 
   @Override
   public int getEndPort() {
-    return this.endPort;
+    return endPort;
   }
 
   @Override
   public int getSocketBufferSize() {
-    return this.socketBufferSize;
+    return socketBufferSize;
   }
 
   @Override
   public boolean isManualStart() {
-    return this.manualStart;
+    return manualStart;
   }
 
   @Override
@@ -145,8 +138,7 @@ public class GatewayReceiverImpl implements GatewayReceiver {
   }
 
   private boolean tryToStart(int port) {
-    if (!AvailablePort.isPortAvailable(port, AvailablePort.SOCKET,
-        AvailablePort.getAddress(AvailablePort.SOCKET))) {
+    if (!AvailablePort.isPortAvailable(port, SOCKET, getAddress(SOCKET))) {
       return false;
     }
 
@@ -158,7 +150,7 @@ public class GatewayReceiverImpl implements GatewayReceiver {
     }
     receiver.setBindAddress(bindAdd);
     receiver.setGroups(new String[] {GatewayReceiver.RECEIVER_GROUP});
-    ((CacheServerImpl) receiver).setGatewayTransportFilter(this.filters);
+    ((CacheServerImpl) receiver).setGatewayTransportFilter(filters);
     try {
       receiver.start();
       this.port = port;
@@ -171,9 +163,9 @@ public class GatewayReceiverImpl implements GatewayReceiver {
   }
 
   @Override
-  public void start() throws IOException {
+  public void start() {
     if (receiver == null) {
-      receiver = this.cache.addCacheServer(true);
+      receiver = cache.addCacheServer(true);
     }
     if (receiver.isRunning()) {
       logger.warn("Gateway Receiver is already running");
@@ -182,6 +174,7 @@ public class GatewayReceiverImpl implements GatewayReceiver {
 
     int loopStartPort = getPortToStart();
     int port = loopStartPort;
+
     while (!tryToStart(port)) {
       // get next port to try
       if (port == endPort && startPort != endPort) {
@@ -191,71 +184,69 @@ public class GatewayReceiverImpl implements GatewayReceiver {
       }
       if (port == loopStartPort || port > endPort) {
         throw new GatewayReceiverException("No available free port found in the given range (" +
-            this.startPort + "-" + this.endPort + ")");
+            startPort + "-" + endPort + ")");
       }
     }
 
-    logger
-        .info("The GatewayReceiver started on port : {}", this.port);
+    logger.info("The GatewayReceiver started on port : {}", this.port);
 
-    InternalDistributedSystem system = this.cache.getInternalDistributedSystem();
+    InternalDistributedSystem system = cache.getInternalDistributedSystem();
     system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_START, this);
   }
 
   private int getPortToStart() {
     // choose a random port from the given port range
-    int rPort;
-    if (this.startPort == this.endPort) {
-      rPort = this.startPort;
+    int randomPort;
+    if (startPort == endPort) {
+      randomPort = startPort;
     } else {
-      rPort = AvailablePort.getRandomAvailablePortInRange(this.startPort, this.endPort,
-          AvailablePort.SOCKET);
+      randomPort = getRandomAvailablePortInRange(startPort, endPort, SOCKET);
     }
-    return rPort;
+    return randomPort;
   }
 
   @Override
   public void stop() {
     if (!isRunning()) {
-      throw new GatewayReceiverException(
-          "Gateway Receiver is not running");
+      throw new GatewayReceiverException("Gateway Receiver is not running");
     }
     receiver.stop();
   }
 
   @Override
   public void destroy() {
-    logger.info("Destroying Gateway Receiver: " + this);
+    logger.info("Destroying Gateway Receiver: {}", this);
     if (receiver == null) {
       // receiver was not started
-      this.cache.removeGatewayReceiver(this);
+      cache.removeGatewayReceiver(this);
     } else {
       if (receiver.isRunning()) {
         throw new GatewayReceiverException(
             "Gateway Receiver is running and needs to be stopped first");
       }
-      this.cache.removeGatewayReceiver(this);
-      this.cache.removeCacheServer(receiver);
+      cache.removeGatewayReceiver(this);
+      cache.removeCacheServer(receiver);
     }
-    InternalDistributedSystem system = this.cache.getInternalDistributedSystem();
+    InternalDistributedSystem system = cache.getInternalDistributedSystem();
     system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_DESTROY, this);
   }
 
   @Override
   public String getBindAddress() {
-    return this.bindAdd;
+    return bindAdd;
   }
 
   @Override
   public boolean isRunning() {
-    if (this.receiver != null) {
-      return this.receiver.isRunning();
+    if (receiver != null) {
+      return receiver.isRunning();
     }
     return false;
   }
 
+  @Override
   public String toString() {
-    return new StringBuffer().append("Gateway Receiver").append("@")
+    return new StringBuilder().append("Gateway Receiver").append("@")
         .append(Integer.toHexString(hashCode())).append("'; port=").append(getPort())
         .append("; bindAddress=").append(getBindAddress()).append("'; hostnameForSenders=")
         .append(getHostnameForSenders()).append("; maximumTimeBetweenPings=")
@@ -264,5 +255,4 @@ public class GatewayReceiverImpl implements GatewayReceiver {
         .append("; group=").append(Arrays.toString(new String[] {GatewayReceiver.RECEIVER_GROUP}))
         .append("]").toString();
   }
-
 }
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplTest.java
similarity index 97%
rename from geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java
rename to geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplTest.java
index 2d9b8ec..ca6819e 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplTest.java
@@ -38,7 +38,7 @@ import org.apache.geode.internal.cache.CacheServerImpl;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.net.SocketCreator;
 
-public class GatewayReceiverImplJUnitTest {
+public class GatewayReceiverImplTest {
 
   @Test
   public void getHostOnUnstartedGatewayShouldReturnLocalhost() throws UnknownHostException {
@@ -49,7 +49,7 @@ public class GatewayReceiverImplJUnitTest {
   }
 
   @Test
-  public void getHostOnRunningGatewayShouldReturnCacheServerAddress() throws IOException {
+  public void getHostOnRunningGatewayShouldReturnCacheServerAddress() {
     InternalCache cache = mock(InternalCache.class);
     CacheServerImpl server = mock(CacheServerImpl.class);
     InternalDistributedSystem system = mock(InternalDistributedSystem.class);
@@ -83,8 +83,7 @@ public class GatewayReceiverImplJUnitTest {
   }
 
   @Test
-  public void destroyCalledOnStoppedGatewayReceiverShouldRemoveReceiverFromCacheServers()
-      throws IOException {
+  public void destroyCalledOnStoppedGatewayReceiverShouldRemoveReceiverFromCacheServers() {
     InternalCache cache = mock(InternalCache.class);
     CacheServerImpl server = mock(CacheServerImpl.class);
     InternalDistributedSystem system = mock(InternalDistributedSystem.class);
@@ -100,8 +99,7 @@ public class GatewayReceiverImplJUnitTest {
   }
 
   @Test
-  public void destroyCalledOnStoppedGatewayReceiverShouldRemoveReceiverFromReceivers()
-      throws IOException {
+  public void destroyCalledOnStoppedGatewayReceiverShouldRemoveReceiverFromReceivers() {
     InternalCache cache = mock(InternalCache.class);
     CacheServerImpl server = mock(CacheServerImpl.class);
     InternalDistributedSystem system = mock(InternalDistributedSystem.class);


[geode] 01/07: GEODE-6626: Make GatewayReceiver to use a single instance

Posted by kl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch GEODE-6626-gatewayReceiver-metrics
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 433c10eef8d3cd2aa4d4e618f89bf708f1915ef4
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Fri Apr 12 14:49:14 2019 -0700

    GEODE-6626: Make GatewayReceiver to use a single instance
    
    Tests and code required there be only one GatewayReceiver,
    so we refactored the code to no longer use a set. Instead we have
    a single atomic reference.
    
    Co-authored-by: Mark Hanson <mh...@pivotal.io>
---
 .../geode/internal/cache/GemFireCacheImpl.java     | 38 +++++-----------------
 .../geode/internal/cache/GemFireCacheImplTest.java | 19 +++++++++--
 2 files changed, 26 insertions(+), 31 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 1e4fcda..456846e 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -68,6 +68,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiPredicate;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
@@ -420,18 +421,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
    */
   private final Set<AsyncEventQueue> allAsyncEventQueues = new CopyOnWriteArraySet<>();
 
-  /**
-   * Controls updates to the list of all gateway receivers
-   *
-   * @see #allGatewayReceivers
-   */
-  private final Object allGatewayReceiversLock = new Object();
-
-  /**
-   * the list of all gateway Receivers. It may be fetched safely (for enumeration), but updates must
-   * by synchronized via {@link #allGatewayReceiversLock}
-   */
-  private volatile Set<GatewayReceiver> allGatewayReceivers = Collections.emptySet();
+  private final AtomicReference<GatewayReceiver> gatewayReceiver = new AtomicReference<>();
 
   /**
    * PartitionedRegion instances (for required-events notification
@@ -3800,28 +3790,14 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
   public void addGatewayReceiver(GatewayReceiver receiver) {
     throwIfClient();
     this.stopper.checkCancelInProgress(null);
-    synchronized (this.allGatewayReceiversLock) {
-      Set<GatewayReceiver> newReceivers = new HashSet<>(this.allGatewayReceivers.size() + 1);
-      if (!this.allGatewayReceivers.isEmpty()) {
-        newReceivers.addAll(this.allGatewayReceivers);
-      }
-      newReceivers.add(receiver);
-      this.allGatewayReceivers = Collections.unmodifiableSet(newReceivers);
-    }
+    gatewayReceiver.set(receiver);
   }
 
   @Override
   public void removeGatewayReceiver(GatewayReceiver receiver) {
     throwIfClient();
     this.stopper.checkCancelInProgress(null);
-    synchronized (this.allGatewayReceiversLock) {
-      Set<GatewayReceiver> newReceivers = new HashSet<>(this.allGatewayReceivers.size() + 1);
-      if (!this.allGatewayReceivers.isEmpty()) {
-        newReceivers.addAll(this.allGatewayReceivers);
-      }
-      newReceivers.remove(receiver);
-      this.allGatewayReceivers = Collections.unmodifiableSet(newReceivers);
-    }
+    gatewayReceiver.set(null);
   }
 
   @Override
@@ -3871,7 +3847,11 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
 
   @Override
   public Set<GatewayReceiver> getGatewayReceivers() {
-    return this.allGatewayReceivers;
+    GatewayReceiver receiver = gatewayReceiver.get();
+    if (receiver == null) {
+      return Collections.emptySet();
+    }
+    return Collections.singleton(receiver);
   }
 
   @Override
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
index 71d982d..9bf8b6a 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
@@ -249,8 +249,24 @@ public class GemFireCacheImplTest {
   }
 
   @Test
-  public void removeGatewayReceiverShouldRemoveFromReceiversList() {
+  public void addGatewayReceiverDoesNotAllowMoreThanOneReceiver() {
     GatewayReceiver receiver = mock(GatewayReceiver.class);
+    GatewayReceiver receiver2 = mock(GatewayReceiver.class);
+
+    gemFireCacheImpl = createGemFireCacheImpl();
+    gemFireCacheImpl.addGatewayReceiver(receiver);
+    assertEquals(1, gemFireCacheImpl.getGatewayReceivers().size());
+
+    gemFireCacheImpl.addGatewayReceiver(receiver2);
+
+    assertEquals(1, gemFireCacheImpl.getGatewayReceivers().size());
+    assertThat(gemFireCacheImpl.getGatewayReceivers()).containsOnly(receiver2);
+  }
+
+  @Test
+  public void removeGatewayReceiverRemovesTheReceiver() {
+    GatewayReceiver receiver = mock(GatewayReceiver.class);
+
     gemFireCacheImpl = createGemFireCacheImpl();
     gemFireCacheImpl.addGatewayReceiver(receiver);
     assertEquals(1, gemFireCacheImpl.getGatewayReceivers().size());
@@ -260,7 +276,6 @@ public class GemFireCacheImplTest {
     assertEquals(0, gemFireCacheImpl.getGatewayReceivers().size());
   }
 
-
   @Test
   public void removeFromCacheServerShouldRemoveFromCacheServersList() {
     gemFireCacheImpl = createGemFireCacheImpl();


[geode] 06/07: GEODE-6626: Fix getStartPort in GatewayReceiverMBeanBridge

Posted by kl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch GEODE-6626-gatewayReceiver-metrics
in repository https://gitbox.apache.org/repos/asf/geode.git

commit f6af2e56e327655030942d1bbb3c32c9ee7a4bd2
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Fri Apr 12 15:11:43 2019 -0700

    GEODE-6626: Fix getStartPort in GatewayReceiverMBeanBridge
    
    Co-authored-by: Michael Oleske <mo...@pivotal.io>
---
 .../geode/management/internal/beans/GatewayReceiverMBeanBridge.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBeanBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBeanBridge.java
index ec6ccfd..73a7732 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBeanBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBeanBridge.java
@@ -137,7 +137,7 @@ public class GatewayReceiverMBeanBridge extends ServerBridge {
 
 
   public int getStartPort() {
-    return rcv.getEndPort();
+    return rcv.getStartPort();
   }
 
 


[geode] 04/07: GEODE-6626: Make GatewayReceiverCommand unit testable

Posted by kl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch GEODE-6626-gatewayReceiver-metrics
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 7bc3864a525284188e6ad59f192e90fa1538f80d
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Fri Apr 12 15:08:05 2019 -0700

    GEODE-6626: Make GatewayReceiverCommand unit testable
    
    Co-authored-by: Michael Oleske <mo...@pivotal.io>
---
 .../sockets/command/GatewayReceiverCommand.java    | 180 +++++++--------------
 1 file changed, 62 insertions(+), 118 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
index 4ba2721..cffa887 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
@@ -43,6 +43,7 @@ import org.apache.geode.internal.cache.tier.sockets.Part;
 import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
 import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewayReceiverMetrics;
 import org.apache.geode.internal.cache.wan.GatewayReceiverStats;
 import org.apache.geode.internal.security.AuthorizeRequest;
 import org.apache.geode.internal.security.SecurityService;
@@ -57,45 +58,37 @@ import org.apache.geode.pdx.internal.PeerTypeRegistration;
 public class GatewayReceiverCommand extends BaseCommand {
 
   @Immutable
-  private static final GatewayReceiverCommand singleton = new GatewayReceiverCommand();
+  private static final GatewayReceiverCommand SINGLETON = new GatewayReceiverCommand();
 
   public static Command getCommand() {
-    return singleton;
+    return SINGLETON;
   }
 
-  private GatewayReceiverCommand() {}
+  GatewayReceiverCommand() {
+    // nothing
+  }
 
   private void handleRegionNull(ServerConnection servConn, String regionName, int batchId) {
     InternalCache cache = servConn.getCachedRegionHelper().getCacheForGatewayCommand();
     if (cache != null && cache.isCacheAtShutdownAll()) {
       throw cache.getCacheClosedException("Shutdown occurred during message processing");
-    } else {
-      String reason = String.format("Region %s was not found during batch create request %s",
-          new Object[] {regionName, Integer.valueOf(batchId)});
-      throw new RegionDestroyedException(reason, regionName);
     }
+    String reason = String.format("Region %s was not found during batch create request %s",
+        regionName, batchId);
+    throw new RegionDestroyedException(reason, regionName);
   }
 
   @Override
   public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection,
       final SecurityService securityService, long start) throws IOException, InterruptedException {
-    Part regionNamePart = null, keyPart = null, valuePart = null, callbackArgPart = null;
-    String regionName = null;
-    Object callbackArg = null, key = null;
-    int partNumber = 0;
     CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
     GatewayReceiverStats stats = (GatewayReceiverStats) serverConnection.getCacheServerStats();
-    EventID eventId = null;
-    LocalRegion region = null;
-    List<BatchException70> exceptions = new ArrayList<BatchException70>();
-    Throwable fatalException = null;
-    // requiresResponse = true;// let PROCESS_BATCH deal with this itself
+
     {
       long oldStart = start;
       start = DistributionStats.getStatTime();
       stats.incReadProcessBatchRequestTime(start - oldStart);
     }
-    Part callbackArgExistsPart;
 
     stats.incBatchSize(clientMessage.getPayloadLength());
 
@@ -130,60 +123,46 @@ public class GatewayReceiverCommand extends BaseCommand {
     if (batchId != serverConnection.getLatestBatchIdReplied() + 1) {
       logger.warn(
           "Received process batch request {} out of order. The id of the last batch processed was {}. This batch request will be processed, but some messages may have been lost.",
-          new Object[] {batchId, serverConnection.getLatestBatchIdReplied()});
+          batchId, serverConnection.getLatestBatchIdReplied());
       stats.incOutoforderBatchesReceived();
     }
 
-
     if (logger.isDebugEnabled()) {
       logger.debug("Received process batch request {} that will be processed.", batchId);
     }
 
-
     if (logger.isDebugEnabled()) {
       logger.debug(
           "{}: Received process batch request {} containing {} events ({} bytes) with {} acknowledgement on {}",
           serverConnection.getName(), batchId, numberOfEvents, clientMessage.getPayloadLength(),
           "normal", serverConnection.getSocketString());
     }
-    // logger.warn("Received process batch request " + batchId + " containing
-    // " + numberOfEvents + " events (" + msg.getPayloadLength() + " bytes) with
-    // " + (earlyAck ? "early" : "normal") + " acknowledgement on " +
-    // getSocketString());
-    // if (earlyAck) {
-    // logger.warn("Sent process batch early response for batch " + batchId +
-    // " containing " + numberOfEvents + " events (" + msg.getPayloadLength() +
-    // " bytes) with " + (earlyAck ? "early" : "normal") + " acknowledgement on
-    // " + getSocketString());
-    // }
 
     // Retrieve the events from the message parts. The '2' below
     // represents the number of events (part0) and the batchId (part1)
-    partNumber = 2;
+    int partNumber = 2;
     int dsid = clientMessage.getPart(partNumber++).getInt();
 
-    boolean removeOnException =
-        clientMessage.getPart(partNumber++).getSerializedForm()[0] == 1 ? true : false;
+    boolean removeOnException = clientMessage.getPart(partNumber++).getSerializedForm()[0] == 1;
 
     // event received in batch also have PDX events at the start of the batch,to
     // represent correct index on which the exception occurred, number of PDX
     // events need to be subtracted.
     int indexWithoutPDXEvent = -1; //
+    Part valuePart = null;
+    Throwable fatalException = null;
+    List<BatchException70> exceptions = new ArrayList<>();
     for (int i = 0; i < numberOfEvents; i++) {
-      boolean retry = true;
-      boolean isPdxEvent = false;
       indexWithoutPDXEvent++;
-      // System.out.println("Processing event " + i + " in batch " + batchId + "
-      // starting with part number " + partNumber);
+
       Part actionTypePart = clientMessage.getPart(partNumber);
       int actionType = actionTypePart.getInt();
 
-      long versionTimeStamp = VersionTag.ILLEGAL_VERSION_TIMESTAMP;
-      EventIDHolder clientEvent = null;
-
       boolean callbackArgExists = false;
 
       try {
+        boolean isPdxEvent = false;
+        boolean retry = true;
         do {
           if (isPdxEvent) {
             // This is a retried event. Reset the PDX event index.
@@ -197,21 +176,19 @@ public class GatewayReceiverCommand extends BaseCommand {
           } catch (Exception e) {
             logger.warn(String.format(
                 "%s: Caught exception processing batch request %s containing %s events",
-                new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
-                    Integer.valueOf(numberOfEvents)}),
-                e);
+                serverConnection.getName(), batchId, numberOfEvents), e);
             handleException(removeOnException, stats, e);
             break;
           }
           boolean possibleDuplicate = possibleDuplicatePartBytes[0] == 0x01;
 
           // Make sure instance variables are null before each iteration
-          regionName = null;
-          key = null;
-          callbackArg = null;
+          String regionName = null;
+          Object key = null;
+          Object callbackArg = null;
 
           // Retrieve the region name from the message parts
-          regionNamePart = clientMessage.getPart(partNumber + 2);
+          Part regionNamePart = clientMessage.getPart(partNumber + 2);
           regionName = regionNamePart.getString();
           if (regionName.equals(PeerTypeRegistration.REGION_FULL_PATH)) {
             indexWithoutPDXEvent--;
@@ -226,32 +203,34 @@ public class GatewayReceiverCommand extends BaseCommand {
           Part eventIdPart = clientMessage.getPart(partNumber + 3);
           eventIdPart.setVersion(serverConnection.getClientVersion());
           // String eventId = eventIdPart.getString();
+          EventID eventId;
           try {
             eventId = (EventID) eventIdPart.getObject();
           } catch (Exception e) {
             logger.warn(String.format(
                 "%s: Caught exception processing batch request %s containing %s events",
-                new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
-                    Integer.valueOf(numberOfEvents)}),
-                e);
+                serverConnection.getName(), batchId, numberOfEvents), e);
             handleException(removeOnException, stats, e);
             break;
           }
 
           // Retrieve the key from the message parts
-          keyPart = clientMessage.getPart(partNumber + 4);
+          Part keyPart = clientMessage.getPart(partNumber + 4);
           try {
             key = keyPart.getStringOrObject();
           } catch (Exception e) {
             logger.warn(String.format(
                 "%s: Caught exception processing batch request %s containing %s events",
-                new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
-                    Integer.valueOf(numberOfEvents)}),
-                e);
+                serverConnection.getName(), batchId, numberOfEvents), e);
             handleException(removeOnException, stats, e);
             break;
           }
-          int index = -1;
+          int index;
+          Part callbackArgPart;
+          EventIDHolder clientEvent;
+          long versionTimeStamp;
+          Part callbackArgExistsPart;
+          LocalRegion region;
           switch (actionType) {
             case 0: // Create
               try {
@@ -286,9 +265,7 @@ public class GatewayReceiverCommand extends BaseCommand {
                     logger
                         .warn(String.format(
                             "%s: Caught exception processing batch create request %s for %s events",
-                            new Object[] {serverConnection.getName(),
-                                Integer.valueOf(batchId),
-                                Integer.valueOf(numberOfEvents)}),
+                            serverConnection.getName(), batchId, numberOfEvents),
                             e);
                     throw e;
                   }
@@ -303,17 +280,13 @@ public class GatewayReceiverCommand extends BaseCommand {
                 // Process the create request
                 if (key == null || regionName == null) {
                   String message = null;
-                  Object[] messageArgs =
-                      new Object[] {serverConnection.getName(), Integer.valueOf(batchId)};
                   if (key == null) {
-                    message =
-                        "%s: The input region name for the batch create request %s is null";
+                    message = "%s: The input key for the batch create request %s is null";
                   }
                   if (regionName == null) {
-                    message =
-                        "%s: The input region name for the batch create request %s is null";
+                    message = "%s: The input region name for the batch create request %s is null";
                   }
-                  String s = String.format(message, messageArgs);
+                  String s = String.format(message, serverConnection.getName(), batchId);
                   logger.warn(s);
                   throw new Exception(s);
                 }
@@ -366,29 +339,19 @@ public class GatewayReceiverCommand extends BaseCommand {
                     throw new Exception(
                         String.format(
                             "%s: Failed to create or update entry for region %s key %s value %s callbackArg %s",
-                            new Object[] {serverConnection.getName(), regionName,
-                                key, valuePart, callbackArg}));
+                            serverConnection.getName(), regionName, key, valuePart, callbackArg));
                   }
                 }
               } catch (Exception e) {
                 logger.warn(String.format(
                     "%s: Caught exception processing batch create request %s for %s events",
-                    new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
-                        Integer.valueOf(numberOfEvents)}),
-                    e);
+                    serverConnection.getName(), batchId, numberOfEvents), e);
                 handleException(removeOnException, stats, e);
               }
               break;
+
             case 1: // Update
               try {
-                /*
-                 * CLIENT EXCEPTION HANDLING TESTING CODE keySt = (String) key;
-                 * System.out.println("Processing updated key: " + key); if
-                 * (keySt.startsWith("failure")) { throw new Exception(LocalizedStrings
-                 * .ProcessBatch_THIS_EXCEPTION_REPRESENTS_A_FAILURE_ON_THE_SERVER
-                 * )); }
-                 */
-
                 // Retrieve the value from the message parts (do not deserialize it)
                 valuePart = clientMessage.getPart(partNumber + 5);
                 // try {
@@ -412,9 +375,7 @@ public class GatewayReceiverCommand extends BaseCommand {
                         .warn(
                             String.format(
                                 "%s: Caught exception processing batch update request %s containing %s events",
-                                new Object[] {serverConnection.getName(),
-                                    Integer.valueOf(batchId),
-                                    Integer.valueOf(numberOfEvents)}),
+                                serverConnection.getName(), batchId, numberOfEvents),
                             e);
                     throw e;
                   }
@@ -429,17 +390,13 @@ public class GatewayReceiverCommand extends BaseCommand {
                 // Process the update request
                 if (key == null || regionName == null) {
                   String message = null;
-                  Object[] messageArgs =
-                      new Object[] {serverConnection.getName(), Integer.valueOf(batchId)};
                   if (key == null) {
-                    message =
-                        "%s: The input key for the batch update request %s is null";
+                    message = "%s: The input key for the batch update request %s is null";
                   }
                   if (regionName == null) {
-                    message =
-                        "%s: The input region name for the batch update request %s is null";
+                    message = "%s: The input region name for the batch update request %s is null";
                   }
-                  String s = String.format(message, messageArgs);
+                  String s = String.format(message, serverConnection.getName(), batchId);
                   logger.warn(s);
                   throw new Exception(s);
                 }
@@ -478,11 +435,10 @@ public class GatewayReceiverCommand extends BaseCommand {
                     stats.incUpdateRequest();
                     retry = false;
                   } else {
-                    final Object[] msgArgs = new Object[] {serverConnection.getName(), regionName,
-                        key, valuePart, callbackArg};
                     final String message =
                         "%s: Failed to update entry for region %s, key %s, value %s, and callbackArg %s";
-                    String s = String.format(message, msgArgs);
+                    String s = String.format(message, serverConnection.getName(), regionName,
+                        key, valuePart, callbackArg);
                     logger.info(s);
                     throw new Exception(s);
                   }
@@ -491,12 +447,11 @@ public class GatewayReceiverCommand extends BaseCommand {
                 // Preserve the connection under all circumstances
                 logger.warn(String.format(
                     "%s: Caught exception processing batch update request %s containing %s events",
-                    new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
-                        Integer.valueOf(numberOfEvents)}),
-                    e);
+                    serverConnection.getName(), batchId, numberOfEvents), e);
                 handleException(removeOnException, stats, e);
               }
               break;
+
             case 2: // Destroy
               try {
                 // Retrieve the callbackArg from the message parts if necessary
@@ -515,9 +470,7 @@ public class GatewayReceiverCommand extends BaseCommand {
                         .warn(
                             String.format(
                                 "%s: Caught exception processing batch destroy request %s containing %s events",
-                                new Object[] {serverConnection.getName(),
-                                    Integer.valueOf(batchId),
-                                    Integer.valueOf(numberOfEvents)}),
+                                serverConnection.getName(), batchId, numberOfEvents),
                             e);
                     throw e;
                   }
@@ -541,9 +494,7 @@ public class GatewayReceiverCommand extends BaseCommand {
                     message =
                         "%s: The input region name for the batch destroy request %s is null";
                   }
-                  Object[] messageArgs =
-                      new Object[] {serverConnection.getName(), Integer.valueOf(batchId)};
-                  String s = String.format(message, messageArgs);
+                  String s = String.format(message, serverConnection.getName(), batchId);
                   logger.warn(s);
                   throw new Exception(s);
                 }
@@ -573,8 +524,7 @@ public class GatewayReceiverCommand extends BaseCommand {
                     serverConnection.setModificationInfo(true, regionName, key);
                   } catch (EntryNotFoundException e) {
                     logger.info("{}: during batch destroy no entry was found for key {}",
-                        new Object[] {serverConnection.getName(), key});
-                    // throw new Exception(e);
+                        serverConnection.getName(), key);
                   }
                   stats.incDestroyRequest();
                   retry = false;
@@ -582,14 +532,13 @@ public class GatewayReceiverCommand extends BaseCommand {
               } catch (Exception e) {
                 logger.warn(String.format(
                     "%s: Caught exception processing batch destroy request %s containing %s events",
-                    new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
-                        Integer.valueOf(numberOfEvents)}),
+                    serverConnection.getName(), batchId, numberOfEvents),
                     e);
                 handleException(removeOnException, stats, e);
               }
               break;
-            case 3: // Update Time-stamp for a RegionEntry
 
+            case 3: // Update Time-stamp for a RegionEntry
               try {
                 // Region name
                 regionNamePart = clientMessage.getPart(partNumber + 2);
@@ -627,9 +576,8 @@ public class GatewayReceiverCommand extends BaseCommand {
                   String message =
                       "%s: Caught exception processing batch update version request request %s containing %s events";
 
-                  Object[] messageArgs = new Object[] {serverConnection.getName(),
-                      Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)};
-                  String s = String.format(message, messageArgs);
+                  String s = String.format(message, serverConnection.getName(),
+                      batchId, numberOfEvents);
                   logger.warn(s);
                   throw new Exception(s);
 
@@ -657,7 +605,7 @@ public class GatewayReceiverCommand extends BaseCommand {
                     } catch (EntryNotFoundException e) {
                       logger.info(
                           "Entry for key {} was not found in Region {} during ProcessBatch for Update Entry Version",
-                          new Object[] {serverConnection.getName(), key});
+                          serverConnection.getName(), key);
                     }
                     retry = false;
                   }
@@ -665,17 +613,14 @@ public class GatewayReceiverCommand extends BaseCommand {
               } catch (Exception e) {
                 logger.warn(String.format(
                     "%s: Caught exception processing batch update version request request %s containing %s events",
-                    new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
-                        Integer.valueOf(numberOfEvents)}),
-                    e);
+                    serverConnection.getName(), batchId, numberOfEvents), e);
                 handleException(removeOnException, stats, e);
               }
 
               break;
             default:
               logger.fatal("{}: Unknown action type ({}) for batch from {}",
-                  new Object[] {serverConnection.getName(), Integer.valueOf(actionType),
-                      serverConnection.getSocketString()});
+                  serverConnection.getName(), actionType, serverConnection.getSocketString());
               stats.incUnknowsOperationsReceived();
           }
         } while (retry);
@@ -698,7 +643,7 @@ public class GatewayReceiverCommand extends BaseCommand {
           fatalException = e.getCause();
           logger.fatal(String.format(
               "This gateway receiver has received a PDX type from %s that does match the existing PDX type. This gateway receiver will not process any more events, in order to prevent receiving objects which may not be deserializable.",
-              new Object[] {serverConnection.getMembershipID()}), e.getCause());
+              serverConnection.getMembershipID()), e.getCause());
           break;
         }
 
@@ -707,9 +652,8 @@ public class GatewayReceiverCommand extends BaseCommand {
         DistributedSystem ds = crHelper.getCacheForGatewayCommand().getDistributedSystem();
         String exceptionMessage = String.format(
             "Exception occurred while processing a batch on the receiver running on DistributedSystem with Id: %s, DistributedMember on which the receiver is running: %s",
-            new Object[] {
-                ((InternalDistributedSystem) ds).getDistributionManager().getDistributedSystemId(),
-                ds.getDistributedMember()});
+            ((InternalDistributedSystem) ds).getDistributionManager().getDistributedSystemId(),
+            ds.getDistributedMember());
         BatchException70 be =
             new BatchException70(exceptionMessage, e, indexWithoutPDXEvent, batchId);
         exceptions.add(be);


[geode] 03/07: GEODE-6626: Expand GatewayReceiverFactoryImplTest

Posted by kl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch GEODE-6626-gatewayReceiver-metrics
in repository https://gitbox.apache.org/repos/asf/geode.git

commit beb537362c37100be65b72674e0335dc8a1eff52
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Fri Apr 12 13:26:30 2019 -0700

    GEODE-6626: Expand GatewayReceiverFactoryImplTest
---
 .../wan/GatewayReceiverFactoryImplJUnitTest.java   | 135 ----------
 .../cache/wan/GatewayReceiverFactoryImplTest.java  | 293 +++++++++++++++++++++
 2 files changed, 293 insertions(+), 135 deletions(-)

diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImplJUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImplJUnitTest.java
deleted file mode 100644
index d01c874..0000000
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImplJUnitTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache.wan;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import org.apache.geode.cache.wan.GatewayReceiver;
-import org.apache.geode.cache.wan.GatewayTransportFilter;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.xmlcache.CacheCreation;
-import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
-
-@RunWith(Parameterized.class)
-@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
-public class GatewayReceiverFactoryImplJUnitTest {
-  @Parameterized.Parameter
-  public static InternalCache cache;
-  private GatewayReceiverFactoryImpl gatewayReceiverFactory;
-
-  @Parameterized.Parameters(name = "{0}")
-  public static Collection<InternalCache> cacheTypes() throws Exception {
-    InternalCache gemfireCacheImpl = spy(mock(GemFireCacheImpl.class, "GemFireCacheImpl"));
-    when(gemfireCacheImpl.getDistributedSystem()).thenReturn(mock(InternalDistributedSystem.class));
-
-    InternalCache declarativeCacheImpl = spy(mock(CacheCreation.class, "CacheCreation"));
-
-    return Arrays.asList(new InternalCache[] {gemfireCacheImpl, declarativeCacheImpl});
-  }
-
-  @Before
-  public void setUp() {
-    gatewayReceiverFactory = spy(new GatewayReceiverFactoryImpl(cache));
-    gatewayReceiverFactory.setManualStart(true);
-  }
-
-  @Test
-  public void createWithDefaultAttributes() {
-    GatewayReceiver receiver = gatewayReceiverFactory.create();
-
-    assertThat(receiver.isManualStart()).isTrue();
-    assertThat(receiver.getGatewayTransportFilters()).isEmpty();
-    assertThat(receiver.getEndPort()).isEqualTo(GatewayReceiver.DEFAULT_END_PORT);
-    assertThat(receiver.getStartPort()).isEqualTo(GatewayReceiver.DEFAULT_START_PORT);
-    assertThat(receiver.getBindAddress()).isEqualTo(GatewayReceiver.DEFAULT_BIND_ADDRESS);
-    assertThat(receiver.getSocketBufferSize())
-        .isEqualTo(GatewayReceiver.DEFAULT_SOCKET_BUFFER_SIZE);
-    assertThat(receiver.getHostnameForSenders())
-        .isEqualTo(GatewayReceiver.DEFAULT_HOSTNAME_FOR_SENDERS);
-    assertThat(receiver.getMaximumTimeBetweenPings())
-        .isEqualTo(GatewayReceiver.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS);
-
-    verify(cache, times(1)).addGatewayReceiver(receiver);
-  }
-
-  @Test
-  public void createWithCustomAttributes() {
-    int endPort = 2500;
-    int startPort = 1500;
-    int socketBufferSize = 128;
-    int timeoutBetweenPings = 1;
-    String bindAddress = "kaos";
-    String hostnameForSenders = "kaos.com";
-    GatewayTransportFilter gatewayTransportFilter = mock(GatewayTransportFilter.class);
-
-    gatewayReceiverFactory.setEndPort(endPort);
-    gatewayReceiverFactory.setStartPort(startPort);
-    gatewayReceiverFactory.setBindAddress(bindAddress);
-    gatewayReceiverFactory.setSocketBufferSize(socketBufferSize);
-    gatewayReceiverFactory.setHostnameForSenders(hostnameForSenders);
-    gatewayReceiverFactory.setMaximumTimeBetweenPings(timeoutBetweenPings);
-    gatewayReceiverFactory.addGatewayTransportFilter(gatewayTransportFilter);
-    GatewayReceiver receiver = gatewayReceiverFactory.create();
-
-    assertThat(receiver.isManualStart()).isTrue();
-    assertThat(receiver.getEndPort()).isEqualTo(endPort);
-    assertThat(receiver.getStartPort()).isEqualTo(startPort);
-    assertThat(receiver.getBindAddress()).isEqualTo(bindAddress);
-    assertThat(receiver.getGatewayTransportFilters()).isNotEmpty();
-    assertThat(receiver.getSocketBufferSize()).isEqualTo(socketBufferSize);
-    assertThat(receiver.getHostnameForSenders()).isEqualTo(hostnameForSenders);
-    assertThat(receiver.getMaximumTimeBetweenPings()).isEqualTo(timeoutBetweenPings);
-    assertThat(receiver.getGatewayTransportFilters()).contains(gatewayTransportFilter);
-
-    verify(cache, times(1)).addGatewayReceiver(receiver);
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void createShouldThrowExceptionWhenPortRangeIsInvalid() {
-    gatewayReceiverFactory.setEndPort(1400);
-    gatewayReceiverFactory.setStartPort(1500);
-    gatewayReceiverFactory.create();
-
-    fail("Exception should have been thrown: endPort < startPort.");
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void createShouldThrownExceptionWhenGatewayReceiverAlreadyExists() {
-    Set mockReceivers = new HashSet();
-    mockReceivers.add(mock(GatewayReceiver.class));
-    when(cache.getGatewayReceivers()).thenReturn(mockReceivers);
-    gatewayReceiverFactory.create();
-
-    fail("Exception should have been thrown: a GatewayReceiver already exists on this cache.");
-  }
-}
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImplTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImplTest.java
new file mode 100644
index 0000000..d164eb9
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImplTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import static java.util.Collections.singleton;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.runners.Parameterized.UseParametersRunnerFactory;
+
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.cache.wan.GatewayTransportFilter;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.xmlcache.CacheCreation;
+import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
+
+@RunWith(Parameterized.class)
+@UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
+public class GatewayReceiverFactoryImplTest {
+
+  @Parameter
+  public InternalCache cache;
+
+  private GatewayReceiverFactoryImpl gatewayReceiverFactory;
+
+  @Parameters(name = "{0}")
+  public static Collection<InternalCache> cacheTypes() {
+    InternalCache gemFireCacheImpl = mock(GemFireCacheImpl.class, "GemFireCacheImpl");
+    InternalCache cacheCreation = mock(CacheCreation.class, "CacheCreation");
+    InternalDistributedSystem system = mock(InternalDistributedSystem.class);
+
+    when(gemFireCacheImpl.addCacheServer(true)).thenReturn(mock(CacheServerImpl.class));
+    when(gemFireCacheImpl.getDistributedSystem()).thenReturn(system);
+    when(gemFireCacheImpl.getInternalDistributedSystem()).thenReturn(system);
+
+    return Arrays.asList(gemFireCacheImpl, cacheCreation);
+  }
+
+  @Before
+  public void setUp() {
+    when(cache.getGatewayReceivers()).thenReturn(Collections.emptySet());
+
+    gatewayReceiverFactory = new GatewayReceiverFactoryImpl(cache);
+  }
+
+  @Test
+  public void createDoesNotUseManualStartByDefault() {
+    GatewayReceiver receiver = gatewayReceiverFactory.create();
+
+    assertThat(receiver.isManualStart()).isFalse();
+  }
+
+  @Test
+  public void createUsesSpecifiedManualStart() {
+    gatewayReceiverFactory.setManualStart(true);
+
+    GatewayReceiver receiver = gatewayReceiverFactory.create();
+
+    assertThat(receiver.isManualStart()).isTrue();
+  }
+
+  @Test
+  public void createDoesNotUseGatewayTransportFiltersByDefault() {
+    GatewayReceiver receiver = gatewayReceiverFactory.create();
+
+    assertThat(receiver.getGatewayTransportFilters()).isEmpty();
+  }
+
+  @Test
+  public void createUsesSpecifiedGatewayTransportFilter() {
+    GatewayTransportFilter gatewayTransportFilter = mock(GatewayTransportFilter.class);
+    gatewayReceiverFactory.addGatewayTransportFilter(gatewayTransportFilter);
+
+    GatewayReceiver receiver = gatewayReceiverFactory.create();
+
+    assertThat(receiver.getGatewayTransportFilters()).containsOnly(gatewayTransportFilter);
+  }
+
+  @Test
+  public void createUsesMultipleSpecifiedGatewayTransportFilters() {
+    GatewayTransportFilter gatewayTransportFilter1 = mock(GatewayTransportFilter.class);
+    GatewayTransportFilter gatewayTransportFilter2 = mock(GatewayTransportFilter.class);
+    GatewayTransportFilter gatewayTransportFilter3 = mock(GatewayTransportFilter.class);
+    gatewayReceiverFactory.addGatewayTransportFilter(gatewayTransportFilter1);
+    gatewayReceiverFactory.addGatewayTransportFilter(gatewayTransportFilter2);
+    gatewayReceiverFactory.addGatewayTransportFilter(gatewayTransportFilter3);
+
+    GatewayReceiver receiver = gatewayReceiverFactory.create();
+
+    assertThat(receiver.getGatewayTransportFilters())
+        .containsExactlyInAnyOrder(gatewayTransportFilter1, gatewayTransportFilter2,
+            gatewayTransportFilter3);
+  }
+
+  @Test
+  public void createUsesEndPortDefault() {
+    int endPortDefault = 5500;
+
+    GatewayReceiver receiver = gatewayReceiverFactory.create();
+
+    assertThat(receiver.getEndPort()).isEqualTo(endPortDefault);
+  }
+
+  @Test
+  public void createUsesSpecifiedEndPort() {
+    int endPort = 6000;
+    gatewayReceiverFactory.setEndPort(endPort);
+
+    GatewayReceiver receiver = gatewayReceiverFactory.create();
+
+    assertThat(receiver.getEndPort()).isEqualTo(endPort);
+  }
+
+  @Test
+  public void createThrowsIllegalStateExceptionIfEndPortIsLessThanStartPortDefault() {
+    int endPort = 2500;
+    gatewayReceiverFactory.setEndPort(endPort);
+
+    Throwable thrown = catchThrowable(() -> gatewayReceiverFactory.create());
+
+    assertThat(thrown).isInstanceOf(IllegalStateException.class);
+  }
+
+  @Test
+  public void createUsesStartPortDefault() {
+    int startPortDefault = 5000;
+
+    GatewayReceiver receiver = gatewayReceiverFactory.create();
+
+    assertThat(receiver.getStartPort()).isEqualTo(startPortDefault);
+  }
+
+  @Test
+  public void createUsesSpecifiedStartPort() {
+    int startPort = 2500;
+    gatewayReceiverFactory.setStartPort(startPort);
+
+    GatewayReceiver receiver = gatewayReceiverFactory.create();
+
+    assertThat(receiver.getStartPort()).isEqualTo(startPort);
+  }
+
+  @Test
+  public void createThrowsIllegalStateExceptionIfSpecifiedStartPortIsGreaterThanEndPortDefault() {
+    int startPort = 6000;
+    gatewayReceiverFactory.setStartPort(startPort);
+
+    Throwable thrown = catchThrowable(() -> gatewayReceiverFactory.create());
+
+    assertThat(thrown).isInstanceOf(IllegalStateException.class);
+  }
+
+  @Test
+  public void createUsesSpecifiedStartPortAndEndPort() {
+    int startPort = 4000;
+    int endPort = 6000;
+    gatewayReceiverFactory.setStartPort(startPort);
+    gatewayReceiverFactory.setEndPort(endPort);
+
+    GatewayReceiver receiver = gatewayReceiverFactory.create();
+
+    assertThat(receiver.getStartPort()).isEqualTo(startPort);
+    assertThat(receiver.getEndPort()).isEqualTo(endPort);
+  }
+
+  @Test
+  public void createThrowsIllegalStateExceptionIfSpecifiedEndPortIsLessThanSpecifiedStartPort() {
+    int startPort = 6000;
+    int endPort = 4000;
+    gatewayReceiverFactory.setStartPort(startPort);
+    gatewayReceiverFactory.setEndPort(endPort);
+
+    Throwable thrown = catchThrowable(() -> gatewayReceiverFactory.create());
+
+    assertThat(thrown).isInstanceOf(IllegalStateException.class);
+  }
+
+  @Test
+  public void createUsesBindAddressDefault() {
+    GatewayReceiver receiver = gatewayReceiverFactory.create();
+
+    assertThat(receiver.getBindAddress()).isEqualTo("");
+  }
+
+  @Test
+  public void createUsesSpecifiedBindAddress() {
+    String bindAddress = "kaos";
+    gatewayReceiverFactory.setBindAddress(bindAddress);
+
+    GatewayReceiver receiver = gatewayReceiverFactory.create();
+
+    assertThat(receiver.getBindAddress()).isEqualTo(bindAddress);
+  }
+
+  @Test
+  public void createUsesSocketBufferSizeDefault() {
+    String socketBufferSizeDefault = "524288";
+
+    GatewayReceiver receiver = gatewayReceiverFactory.create();
+
+    assertThat(receiver.getSocketBufferSize()).isEqualTo(Integer.valueOf(socketBufferSizeDefault));
+  }
+
+  @Test
+  public void createUsesSpecifiedSocketBufferSize() {
+    int socketBufferSize = 128;
+    gatewayReceiverFactory.setSocketBufferSize(socketBufferSize);
+
+    GatewayReceiver receiver = gatewayReceiverFactory.create();
+
+    assertThat(receiver.getSocketBufferSize()).isEqualTo(socketBufferSize);
+  }
+
+  @Test
+  public void createUsesHostnameForSendersDefault() {
+    GatewayReceiver receiver = gatewayReceiverFactory.create();
+
+    assertThat(receiver.getHostnameForSenders()).isEqualTo("");
+  }
+
+  @Test
+  public void createUsesSpecifiedHostnameForSenders() {
+    String hostnameForSenders = "kaos.com";
+    gatewayReceiverFactory.setHostnameForSenders(hostnameForSenders);
+
+    GatewayReceiver receiver = gatewayReceiverFactory.create();
+
+    assertThat(receiver.getHostnameForSenders()).isEqualTo(hostnameForSenders);
+  }
+
+  @Test
+  public void createUsesMaximumTimeBetweenPingsDefault() {
+    int maximumTimeBetweenPingsDefault = 60000;
+
+    GatewayReceiver receiver = gatewayReceiverFactory.create();
+
+    assertThat(receiver.getMaximumTimeBetweenPings()).isEqualTo(maximumTimeBetweenPingsDefault);
+  }
+
+  @Test
+  public void createUsesSpecifiedMaximumTimeBetweenPings() {
+    int timeoutBetweenPings = 1;
+    gatewayReceiverFactory.setMaximumTimeBetweenPings(timeoutBetweenPings);
+
+    GatewayReceiver receiver = gatewayReceiverFactory.create();
+
+    assertThat(receiver.getMaximumTimeBetweenPings()).isEqualTo(timeoutBetweenPings);
+  }
+
+  @Test
+  public void createAddsGatewayReceiverToCache() {
+    GatewayReceiver receiver = gatewayReceiverFactory.create();
+
+    verify(cache).addGatewayReceiver(receiver);
+  }
+
+  @Test
+  public void createThrowsIllegalStateExceptionIfGatewayReceiverAlreadyExists() {
+    when(cache.getGatewayReceivers()).thenReturn(singleton(mock(GatewayReceiver.class)));
+
+    Throwable thrown = catchThrowable(() -> gatewayReceiverFactory.create());
+
+    assertThat(thrown).isInstanceOf(IllegalStateException.class);
+  }
+}


[geode] 07/07: wip

Posted by kl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch GEODE-6626-gatewayReceiver-metrics
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 2e0b5b1404f53fe931e88555ff38f90ec51d22b0
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Fri Apr 12 15:13:01 2019 -0700

    wip
---
 .../internal/cache/wan/GatewayReceiverMetrics.java | 35 ++++++++++
 .../command/GatewayReceiverCommandTest.java        | 81 ++++++++++++++++++++++
 .../cache/wan/GatewayReceiverMetricsTest.java      | 52 ++++++++++++++
 .../cache/wan/GatewayReceiverFactoryImpl.java      |  5 +-
 .../cache/wan/GatewayReceiverFactoryImplTest.java  | 11 +++
 .../cache/wan/GatewayReceiverImplTest.java         | 32 ++++++---
 6 files changed, 206 insertions(+), 10 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverMetrics.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverMetrics.java
new file mode 100644
index 0000000..d340ebe
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverMetrics.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.MeterRegistry;
+
+public class GatewayReceiverMetrics {
+
+  private final MeterRegistry meterRegistry;
+  private final Counter eventsReceivedCounter;
+
+  public GatewayReceiverMetrics(MeterRegistry meterRegistry) {
+    this.meterRegistry = meterRegistry;
+
+    eventsReceivedCounter = Counter.builder("cache.gatewayreceiver.events.received")
+        .register(meterRegistry);
+  }
+
+  public void close() {
+    meterRegistry.remove(eventsReceivedCounter);
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommandTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommandTest.java
new file mode 100644
index 0000000..b6096bd
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommandTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.mockito.quality.Strictness;
+
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.Part;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.internal.cache.wan.GatewayReceiverStats;
+import org.apache.geode.internal.security.SecurityService;
+
+public class GatewayReceiverCommandTest {
+
+  @Rule
+  public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
+
+  @Mock
+  private Message clientMessage;
+  @Mock
+  private ServerConnection serverConnection;
+  @Mock
+  private SecurityService securityService;
+  @Mock
+  private InternalCache cache;
+
+  private long start;
+
+  private MeterRegistry meterRegistry;
+
+  @Before
+  public void setUp() {
+    start = 1;
+    meterRegistry = new SimpleMeterRegistry();
+
+    when(cache.getMeterRegistry()).thenReturn(meterRegistry);
+    when(clientMessage.getPart(anyInt())).thenReturn(mock(Part.class));
+    when(serverConnection.getCache()).thenReturn(cache);
+    when(serverConnection.getCacheServerStats()).thenReturn(mock(GatewayReceiverStats.class));
+    when(serverConnection.getResponseMessage()).thenReturn(mock(Message.class));
+  }
+
+  @Test
+  public void foo() throws Exception {
+    GatewayReceiverCommand command = new GatewayReceiverCommand();
+    command.cmdExecute(clientMessage, serverConnection, securityService, start);
+
+    Counter counter = meterRegistry.find("cache.gatewayreceiver.events.received")
+        .counter();
+
+    assertThat(counter).isNotNull();
+    assertThat(counter.count()).isEqualTo(1L);
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverMetricsTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverMetricsTest.java
new file mode 100644
index 0000000..a1c5f51
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverMetricsTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import org.junit.Before;
+import org.junit.Test;
+
+public class GatewayReceiverMetricsTest {
+
+  private MeterRegistry meterRegistry;
+
+  @Before
+  public void setUp() {
+    meterRegistry = new SimpleMeterRegistry();
+  }
+
+  @Test
+  public void createsEventsReceivedCounter() throws Exception {
+    GatewayReceiverMetrics metrics = new GatewayReceiverMetrics(meterRegistry);
+    Counter counter = meterRegistry.find("cache.gatewayreceiver.events.received")
+        .counter();
+
+    assertThat(counter).isNotNull();
+  }
+
+  @Test
+  public void closeRemovesEventsReceivedCounter() throws Exception {
+    GatewayReceiverMetrics metrics = new GatewayReceiverMetrics(meterRegistry);
+    metrics.close();
+    Counter counter = meterRegistry.find("cache.gatewayreceiver.events.received")
+        .counter();
+
+    assertThat(counter).isNull();
+  }
+}
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java
index f3b52bf..fbe5192 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java
@@ -20,6 +20,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import io.micrometer.core.instrument.MeterRegistry;
+
 import org.apache.geode.cache.wan.GatewayReceiver;
 import org.apache.geode.cache.wan.GatewayReceiverFactory;
 import org.apache.geode.cache.wan.GatewayTransportFilter;
@@ -129,9 +131,10 @@ public class GatewayReceiverFactoryImpl implements GatewayReceiverFactory {
 
     GatewayReceiver recv = null;
     if (this.cache instanceof GemFireCacheImpl) {
+      MeterRegistry meterRegistry = cache.getMeterRegistry();
       recv = new GatewayReceiverImpl(this.cache, this.startPort, this.endPort, this.timeBetPings,
           this.socketBuffSize, this.bindAdd, this.filters, this.hostnameForSenders,
-          this.manualStart);
+          this.manualStart, meterRegistry);
       this.cache.addGatewayReceiver(recv);
       InternalDistributedSystem system =
           (InternalDistributedSystem) this.cache.getDistributedSystem();
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImplTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImplTest.java
index d164eb9..fff5c0f 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImplTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImplTest.java
@@ -25,6 +25,8 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -50,6 +52,7 @@ public class GatewayReceiverFactoryImplTest {
   public InternalCache cache;
 
   private GatewayReceiverFactoryImpl gatewayReceiverFactory;
+  private MeterRegistry meterRegistry;
 
   @Parameters(name = "{0}")
   public static Collection<InternalCache> cacheTypes() {
@@ -67,6 +70,7 @@ public class GatewayReceiverFactoryImplTest {
   @Before
   public void setUp() {
     when(cache.getGatewayReceivers()).thenReturn(Collections.emptySet());
+    when(cache.getMeterRegistry()).thenReturn(meterRegistry);
 
     gatewayReceiverFactory = new GatewayReceiverFactoryImpl(cache);
   }
@@ -283,6 +287,13 @@ public class GatewayReceiverFactoryImplTest {
   }
 
   @Test
+  public void createUsesMeterRegistryFromCache() {
+    gatewayReceiverFactory.create();
+
+    verify(cache).getMeterRegistry();
+  }
+
+  @Test
   public void createThrowsIllegalStateExceptionIfGatewayReceiverAlreadyExists() {
     when(cache.getGatewayReceivers()).thenReturn(singleton(mock(GatewayReceiver.class)));
 
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplTest.java
index ca6819e..3a040c9 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplTest.java
@@ -31,6 +31,8 @@ import java.net.SocketException;
 import java.net.UnknownHostException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import io.micrometer.core.instrument.MeterRegistry;
+import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -40,11 +42,18 @@ import org.apache.geode.internal.net.SocketCreator;
 
 public class GatewayReceiverImplTest {
 
+  private MeterRegistry meterRegistry;
+
+  @Before
+  public void setUp() {
+    meterRegistry = mock(MeterRegistry.class);
+  }
+
   @Test
   public void getHostOnUnstartedGatewayShouldReturnLocalhost() throws UnknownHostException {
     InternalCache cache = mock(InternalCache.class);
     GatewayReceiverImpl gateway =
-        new GatewayReceiverImpl(cache, 2000, 2001, 5, 100, null, null, null, true);
+        new GatewayReceiverImpl(cache, 2000, 2001, 5, 100, null, null, null, true, meterRegistry);
     assertEquals(SocketCreator.getLocalHost().getHostName(), gateway.getHost());
   }
 
@@ -57,7 +66,7 @@ public class GatewayReceiverImplTest {
     when(server.getExternalAddress()).thenReturn("hello");
     when(cache.addCacheServer(eq(true))).thenReturn(server);
     GatewayReceiverImpl gateway =
-        new GatewayReceiverImpl(cache, 2000, 2001, 5, 100, null, null, null, true);
+        new GatewayReceiverImpl(cache, 2000, 2001, 5, 100, null, null, null, true, meterRegistry);
     gateway.start();
     assertEquals("hello", gateway.getHost());
   }
@@ -72,7 +81,7 @@ public class GatewayReceiverImplTest {
     when(server.isRunning()).thenReturn(true);
     when(cache.addCacheServer(eq(true))).thenReturn(server);
     GatewayReceiverImpl gateway =
-        new GatewayReceiverImpl(cache, 2000, 2001, 5, 100, null, null, null, true);
+        new GatewayReceiverImpl(cache, 2000, 2001, 5, 100, null, null, null, true, meterRegistry);
     gateway.start();
     try {
       gateway.destroy();
@@ -91,7 +100,7 @@ public class GatewayReceiverImplTest {
     when(server.getExternalAddress()).thenReturn("hello");
     when(cache.addCacheServer(eq(true))).thenReturn(server);
     GatewayReceiverImpl gateway =
-        new GatewayReceiverImpl(cache, 2000, 2001, 5, 100, null, null, null, true);
+        new GatewayReceiverImpl(cache, 2000, 2001, 5, 100, null, null, null, true, meterRegistry);
     gateway.start();
     // sender is mocked already to say running is false
     gateway.destroy();
@@ -107,7 +116,7 @@ public class GatewayReceiverImplTest {
     when(server.getExternalAddress()).thenReturn("hello");
     when(cache.addCacheServer(eq(true))).thenReturn(server);
     GatewayReceiverImpl gateway =
-        new GatewayReceiverImpl(cache, 2000, 2001, 5, 100, null, null, null, true);
+        new GatewayReceiverImpl(cache, 2000, 2001, 5, 100, null, null, null, true, meterRegistry);
     gateway.start();
     // sender is mocked already to say running is false
     gateway.destroy();
@@ -121,7 +130,7 @@ public class GatewayReceiverImplTest {
     when(cache.addCacheServer(eq(true))).thenReturn(server);
     doThrow(new SocketException("Address already in use")).when(server).start();
     GatewayReceiverImpl gateway =
-        new GatewayReceiverImpl(cache, 2000, 2001, 5, 100, null, null, null, true);
+        new GatewayReceiverImpl(cache, 2000, 2001, 5, 100, null, null, null, true, meterRegistry);
     assertThatThrownBy(() -> gateway.start()).isInstanceOf(GatewayReceiverException.class)
         .hasMessageContaining("No available free port found in the given range");
     verify(server, times(2)).start();
@@ -134,7 +143,7 @@ public class GatewayReceiverImplTest {
     when(cache.addCacheServer(eq(true))).thenReturn(server);
     doThrow(new SocketException("Address already in use")).when(server).start();
     GatewayReceiverImpl gateway =
-        new GatewayReceiverImpl(cache, 2000, 2000, 5, 100, null, null, null, true);
+        new GatewayReceiverImpl(cache, 2000, 2000, 5, 100, null, null, null, true, meterRegistry);
     assertThatThrownBy(() -> gateway.start()).isInstanceOf(GatewayReceiverException.class)
         .hasMessageContaining("No available free port found in the given range");
     verify(server, times(1)).start();
@@ -147,7 +156,7 @@ public class GatewayReceiverImplTest {
     when(cache.addCacheServer(eq(true))).thenReturn(server);
     doThrow(new SocketException("Address already in use")).when(server).start();
     GatewayReceiverImpl gateway =
-        new GatewayReceiverImpl(cache, 2000, 2100, 5, 100, null, null, null, true);
+        new GatewayReceiverImpl(cache, 2000, 2100, 5, 100, null, null, null, true, meterRegistry);
     assertThatThrownBy(() -> gateway.start()).isInstanceOf(GatewayReceiverException.class)
         .hasMessageContaining("No available free port found in the given range");
     assertTrue(gateway.getPort() == 0);
@@ -171,10 +180,15 @@ public class GatewayReceiverImplTest {
       return 0;
     }).when(server).start();
     GatewayReceiverImpl gateway =
-        new GatewayReceiverImpl(cache, 2000, 2010, 5, 100, null, null, null, true);
+        new GatewayReceiverImpl(cache, 2000, 2010, 5, 100, null, null, null, true, meterRegistry);
     gateway.start();
     assertTrue(gateway.getPort() >= 2000);
     assertEquals(2, callCount.get());
     verify(server, times(3)).start(); // 2 failed tries, 1 succeeded
   }
+
+  @Test
+  public void createsEventsReceivedCounter() {
+
+  }
 }


[geode] 05/07: GEODE-6626: Remove unused method from GatewayReceiverMBean

Posted by kl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch GEODE-6626-gatewayReceiver-metrics
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 4d23e7ea8485e6efb5151d0fda1928c76c806d14
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Fri Apr 12 15:09:52 2019 -0700

    GEODE-6626: Remove unused method from GatewayReceiverMBean
    
    Co-authored-by: Michael Oleske <mo...@pivotal.io>
---
 .../apache/geode/management/internal/beans/GatewayReceiverMBean.java  | 4 ----
 1 file changed, 4 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBean.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBean.java
index 4b5cff9..2bd94c5 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBean.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBean.java
@@ -189,10 +189,6 @@ public class GatewayReceiverMBean extends NotificationBroadcasterSupport
     return bridge.getTotalSentBytes();
   }
 
-  public void stopMonior() {
-    bridge.stopMonitor();
-  }
-
   @Override
   public String[] getConnectedGatewaySenders() {
     return bridge.getConnectedGatewaySenders();