You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mi...@apache.org on 2022/06/28 20:58:07 UTC

[geode] branch develop updated: GEODE-10020: For Ping task avoid registering new destination endpoint (#7749)

This is an automated email from the ASF dual-hosted git repository.

mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 24613d9f1a GEODE-10020: For Ping task avoid registering new destination endpoint (#7749)
24613d9f1a is described below

commit 24613d9f1a759f9c94009a75fd935b1f0f8383e3
Author: Mario Ivanac <48...@users.noreply.github.com>
AuthorDate: Tue Jun 28 22:58:00 2022 +0200

    GEODE-10020: For Ping task avoid registering new destination endpoint (#7749)
    
    * GEODE-10020: For Ping task avoid registering new destination endpoint
---
 ...iversWithSamePortAndHostnameForSendersTest.java |  48 +++-
 .../cache/client/internal/ConnectionImpl.java      |  21 +-
 .../cache/client/internal/OpExecutorImpl.java      |  17 +-
 .../geode/distributed/internal/ServerLocation.java |  15 +-
 .../internal/ServerLocationExtension.java          |  35 +++
 .../cache/client/internal/ConnectionImplTest.java  | 253 +++++++++++++++++++++
 .../internal/ServerLocationExtensionTest.java      |  86 +++++++
 .../distributed/internal/ServerLocationTest.java   |  72 ++++++
 8 files changed, 537 insertions(+), 10 deletions(-)

diff --git a/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java b/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
index 485d91dc3d..1501247671 100644
--- a/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
+++ b/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
@@ -46,6 +46,7 @@ import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.internal.EndpointManager;
 import org.apache.geode.cache.persistence.PartitionOfflineException;
 import org.apache.geode.distributed.Locator;
 import org.apache.geode.internal.cache.ForceReattemptException;
@@ -215,7 +216,6 @@ public class SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest {
 
   }
 
-
   /**
    * The aim of this test is verify that when several gateway receivers in a remote site share the
    * same port and hostname-for-senders, the pings sent from the gateway senders reach the right
@@ -263,6 +263,44 @@ public class SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest {
   }
 
 
+  /**
+   * The aim of this test is verify that when several gateway receivers in a remote site share the
+   * same port and hostname-for-senders, the pings sent from the gateway senders reach the right
+   * gateway receiver and not just any of the receivers. Check that only one destination will be
+   * pinged.
+   */
+  @Test
+  public void testPingsToReceiversWithSamePortAndHostnameForSendersReachTheRightReceiver()
+      throws InterruptedException {
+    String senderId = "ln";
+    String regionName = "region-wan";
+    final int remoteLocPort = docker.getExternalPortForService("haproxy", 20334);
+
+    int locPort = createLocator(VM.getVM(0), 1, remoteLocPort);
+
+    VM vm1 = VM.getVM(1);
+    createCache(vm1, locPort);
+
+    // We use one dispatcher thread. With just one dispatcher thread, only one
+    // connection will be created by the sender towards one of the receivers and it will be
+    // monitored by the one ping thread for that remote receiver.
+    createGatewaySender(vm1, senderId, 2, true, 5,
+        1, GatewaySender.DEFAULT_ORDER_POLICY);
+
+    createPartitionedRegion(vm1, regionName, senderId, 0, 10);
+
+    int NUM_PUTS = 1;
+
+    putKeyValues(vm1, NUM_PUTS, regionName);
+
+    await().untilAsserted(() -> {
+      assertThat(getQueuedEvents(vm1, senderId)).isEqualTo(0);
+      assertThat(getSenderPoolDisconnects(vm1, senderId)).isEqualTo(0);
+      assertThat(getPoolEndPointSize(vm1, senderId)).isEqualTo(1);
+    });
+
+  }
+
   private boolean allDispatchersConnectedToSameReceiver(int server) {
 
     String gfshOutput = runListGatewayReceiversCommandInServer(server);
@@ -396,6 +434,14 @@ public class SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest {
     });
   }
 
+  private static int getPoolEndPointSize(VM vm, String senderId) {
+    return vm.invoke(() -> {
+      AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
+      EndpointManager manager = sender.getProxy().getEndpointManager();
+      return manager.getEndpointMap().size();
+    });
+  }
+
   private static int getSenderPoolDisconnects(VM vm, String senderId) {
     return vm.invoke(() -> {
       AbstractGatewaySender sender =
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
index aa0d4754f6..dfbacbedab 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
@@ -33,6 +33,8 @@ import org.apache.geode.cache.client.SocketFactory;
 import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.distributed.internal.ServerLocationAndMemberId;
+import org.apache.geode.distributed.internal.ServerLocationExtension;
 import org.apache.geode.distributed.internal.tcpserver.HostAndPort;
 import org.apache.geode.internal.cache.tier.ClientSideHandshake;
 import org.apache.geode.internal.cache.tier.CommunicationMode;
@@ -111,7 +113,17 @@ public class ConnectionImpl implements Connection {
     }
     theSocket.setSoTimeout(readTimeout);
 
-    endpoint = endpointManager.referenceEndpoint(location, status.getMemberId());
+    Endpoint tempEndpoint = null;
+
+    if (location instanceof ServerLocationExtension) {
+      tempEndpoint = getEndpoint(endpointManager, (ServerLocationExtension) location);
+    }
+
+    if (tempEndpoint == null || tempEndpoint.isClosed()) {
+      tempEndpoint = endpointManager.referenceEndpoint(location, status.getMemberId());
+    }
+
+    endpoint = tempEndpoint;
     connectFinished = true;
     endpoint.getStats().incConnections(1);
     return status;
@@ -203,6 +215,13 @@ public class ConnectionImpl implements Connection {
     }
   }
 
+  Endpoint getEndpoint(EndpointManager endpointManager,
+      ServerLocationExtension serverLocationExtension) {
+    ServerLocationAndMemberId serverLocationAndMemberId =
+        serverLocationExtension.getServerLocationAndMemberId();
+    return endpointManager.getEndpointMap().get(serverLocationAndMemberId);
+  }
+
   @Override
   public ByteBuffer getCommBuffer() throws SocketException {
     if (isDestroyed()) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
index 4a1b7828cb..335f077831 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
@@ -55,6 +55,7 @@ import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.FunctionInvocationTargetException;
 import org.apache.geode.distributed.internal.ServerLocation;
 import org.apache.geode.distributed.internal.ServerLocationAndMemberId;
+import org.apache.geode.distributed.internal.ServerLocationExtension;
 import org.apache.geode.internal.cache.PutAllPartialResultException;
 import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
 import org.apache.geode.internal.cache.tier.BatchException;
@@ -319,12 +320,14 @@ public class OpExecutorImpl implements ExecutablePool {
     if (op instanceof PingOp.PingOpImpl) {
       // currently for pings we prefer to queue clientToServer cnx so that we will
       // not create a pooled cnx when all we have is queue connections.
+      final ServerLocationAndMemberId serverLocationAndMemberId =
+          new ServerLocationAndMemberId(server,
+              ((PingOp.PingOpImpl) op).getServerID().getUniqueId());
+
       if (queueManager != null) {
         // see if our QueueManager has a connection to this server that we can send
         // the ping on.
-        final ServerLocationAndMemberId slAndMId = new ServerLocationAndMemberId(server,
-            ((PingOp.PingOpImpl) op).getServerID().getUniqueId());
-        final Endpoint endpoint = endpointManager.getEndpointMap().get(slAndMId);
+        final Endpoint endpoint = endpointManager.getEndpointMap().get(serverLocationAndMemberId);
         if (endpoint != null) {
           QueueConnections queueConnections = queueManager.getAllConnectionsNoWait();
           connection = queueConnections.getConnection(endpoint);
@@ -334,8 +337,12 @@ public class OpExecutorImpl implements ExecutablePool {
           }
         }
       }
-    }
-    if (connection == null) {
+      if (connection == null) {
+        ServerLocationExtension sle = new ServerLocationExtension(serverLocationAndMemberId);
+        connection = connectionManager.borrowConnection(sle, singleServerTimeout,
+            onlyUseExistingConnection);
+      }
+    } else {
       connection = connectionManager.borrowConnection(server, singleServerTimeout,
           onlyUseExistingConnection);
     }
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocation.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocation.java
index 069992a876..a3c187880b 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocation.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocation.java
@@ -57,15 +57,24 @@ public class ServerLocation implements DataSerializable, Comparable<ServerLocati
   /**
    * For DataSerializer
    */
-  public ServerLocation() {
-
-  }
+  public ServerLocation() {}
 
   public ServerLocation(String hostName, int port) {
     this.hostName = hostName;
     this.port = port;
   }
 
+  public ServerLocation(ServerLocation other) {
+    this.hostName = other.hostName;
+    this.port = other.port;
+    this.userId = other.userId;
+    int tempRequiresCredentials = other.requiresCredentials.get();
+    if (tempRequiresCredentials != INITIAL_REQUIRES_CREDENTIALS) {
+      this.requiresCredentials.set(tempRequiresCredentials);
+    }
+  }
+
+
   public String getHostName() {
     return hostName;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocationExtension.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocationExtension.java
new file mode 100644
index 0000000000..50f546e54e
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocationExtension.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal;
+
+
+/**
+ * Represents the ServerLocation extension to pass ServerLocationAndMemberId info
+ *
+ *
+ */
+public class ServerLocationExtension extends ServerLocation {
+  private final ServerLocationAndMemberId serverLocationAndMemberId;
+
+  public ServerLocationExtension(ServerLocationAndMemberId serverLocationMember) {
+    super(serverLocationMember.getServerLocation());
+    this.serverLocationAndMemberId = serverLocationMember;
+  }
+
+  public ServerLocationAndMemberId getServerLocationAndMemberId() {
+    return serverLocationAndMemberId;
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ConnectionImplTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ConnectionImplTest.java
new file mode 100644
index 0000000000..639be64716
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ConnectionImplTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.client.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.geode.cache.client.SocketFactory;
+import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.distributed.internal.ServerLocationAndMemberId;
+import org.apache.geode.distributed.internal.ServerLocationExtension;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.tcpserver.ClientSocketCreator;
+import org.apache.geode.internal.cache.tier.ClientSideHandshake;
+import org.apache.geode.internal.cache.tier.CommunicationMode;
+import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus;
+import org.apache.geode.internal.net.SocketCreator;
+
+public class ConnectionImplTest {
+
+  private ConnectionImpl connection;
+  private EndpointManagerImpl endpointManager;
+
+  private Endpoint endPoint1;
+  private ServerLocationAndMemberId serverLocationAndMemberId1;
+
+  private Endpoint endPoint2;
+  private ServerLocationAndMemberId serverLocationAndMemberId2;
+
+  private Endpoint endPoint3;
+  private ServerLocationAndMemberId serverLocationAndMemberId3;
+
+  private ServerLocationAndMemberId serverLocationAndMemberId4;
+
+  private Map<ServerLocationAndMemberId, Endpoint> endpointMap = new HashMap<>();
+
+  @BeforeEach
+  public void init() throws Exception {
+    connection = new ConnectionImpl(mock(InternalDistributedSystem.class));
+    endpointManager = mock(EndpointManagerImpl.class);
+    when(endpointManager.getEndpointMap()).thenReturn(endpointMap);
+
+    ServerLocation serverLocation1 = new ServerLocation("localhost", 1);
+    InternalDistributedMember distributedMember1 = new InternalDistributedMember("localhost", 1);
+    distributedMember1.setVmViewId(1);
+    String uniqueId1 = distributedMember1.getUniqueId();
+
+    serverLocationAndMemberId1 =
+        new ServerLocationAndMemberId(serverLocation1, uniqueId1);
+    endPoint1 = new Endpoint(endpointManager, mock(DistributedSystem.class), serverLocation1,
+        mock(ConnectionStats.class), distributedMember1);
+
+    endpointMap.put(serverLocationAndMemberId1, endPoint1);
+
+    ServerLocation serverLocation2 = new ServerLocation("localhost", 2);
+    InternalDistributedMember distributedMember2 = new InternalDistributedMember("localhost", 2);
+    distributedMember2.setVmViewId(2);
+    String uniqueId2 = distributedMember1.getUniqueId();
+
+    serverLocationAndMemberId2 =
+        new ServerLocationAndMemberId(serverLocation2, uniqueId2);
+    endPoint2 = new Endpoint(endpointManager, mock(DistributedSystem.class), serverLocation2,
+        mock(ConnectionStats.class), distributedMember2);
+
+    endpointMap.put(serverLocationAndMemberId2, endPoint2);
+
+    InternalDistributedMember distributedMember3 = new InternalDistributedMember("localhost", 1);
+    distributedMember3.setVmViewId(3);
+    String uniqueId3 = distributedMember3.getUniqueId();
+
+    serverLocationAndMemberId3 =
+        new ServerLocationAndMemberId(serverLocation1, uniqueId3);
+    endPoint3 = new Endpoint(endpointManager, mock(DistributedSystem.class), serverLocation1,
+        mock(ConnectionStats.class), distributedMember3);
+
+
+    InternalDistributedMember distributedMember4 = new InternalDistributedMember("localhost", 1);
+    distributedMember4.setVmViewId(4);
+    String uniqueId4 = distributedMember4.getUniqueId();
+
+    serverLocationAndMemberId4 =
+        new ServerLocationAndMemberId(serverLocation1, uniqueId4);
+
+    endpointMap.put(serverLocationAndMemberId3, endPoint3);
+
+  }
+
+  @Test
+  public void testGetEndpoint1() throws Exception {
+    ServerLocationExtension serverLocationExtension1 =
+        new ServerLocationExtension(serverLocationAndMemberId1);
+
+    assertThat(connection.getEndpoint(endpointManager, serverLocationExtension1))
+        .isEqualTo(endPoint1);
+  }
+
+  @Test
+  public void testGetTwoEndpointsForDifferentServerLocations() throws Exception {
+    ServerLocationExtension serverLocationExtension1 =
+        new ServerLocationExtension(serverLocationAndMemberId1);
+
+    ServerLocationExtension serverLocationExtension2 =
+        new ServerLocationExtension(serverLocationAndMemberId2);
+
+    assertThat(serverLocationExtension1).isNotEqualTo(serverLocationExtension2);
+
+    assertThat(connection.getEndpoint(endpointManager, serverLocationExtension1))
+        .isEqualTo(endPoint1);
+    assertThat(connection.getEndpoint(endpointManager, serverLocationExtension2))
+        .isEqualTo(endPoint2);
+
+    assertThat(endPoint1).isNotEqualTo(endPoint2);
+  }
+
+
+  @Test
+  public void testGetTwoEndpointsForEqualServerLocations() throws Exception {
+    ServerLocationExtension serverLocationExtension1 =
+        new ServerLocationExtension(serverLocationAndMemberId1);
+
+    ServerLocationExtension serverLocationExtension3 =
+        new ServerLocationExtension(serverLocationAndMemberId3);
+
+    assertThat(serverLocationExtension1).isEqualTo(serverLocationExtension3);
+
+    assertThat(connection.getEndpoint(endpointManager, serverLocationExtension1))
+        .isEqualTo(endPoint1);
+    assertThat(connection.getEndpoint(endpointManager, serverLocationExtension3))
+        .isEqualTo(endPoint3);
+
+    assertThat(endPoint1).isNotEqualTo(endPoint3);
+  }
+
+  @Test
+  public void testGetEndpointsForEqualServerLocationsBtOnlyOneExist() throws Exception {
+    ServerLocationExtension serverLocationExtension3 =
+        new ServerLocationExtension(serverLocationAndMemberId3);
+
+    ServerLocationExtension serverLocationExtension4 =
+        new ServerLocationExtension(serverLocationAndMemberId4);
+
+    assertThat(serverLocationExtension3).isEqualTo(serverLocationExtension4);
+
+    assertThat(connection.getEndpoint(endpointManager, serverLocationExtension3))
+        .isEqualTo(endPoint3);
+    assertThat(connection.getEndpoint(endpointManager, serverLocationExtension4))
+        .isNull();
+
+  }
+
+
+  @Test
+  public void testConnectConnectionForPingTask() throws Exception {
+
+    ServerLocationExtension serverLocationExtension1 =
+        new ServerLocationExtension(serverLocationAndMemberId1);
+    ClientSideHandshake handshake = mock(ClientSideHandshake.class);
+    SocketCreator socketCreator = mock(SocketCreator.class);
+    SocketFactory socketFactory = mock(SocketFactory.class);
+
+    ClientSocketCreator clientSocketCreator = mock(ClientSocketCreator.class);
+    when(socketCreator.forClient()).thenReturn(clientSocketCreator);
+
+    Socket socket = mock(Socket.class);
+    when(clientSocketCreator.connect(any(), anyInt(), anyInt(), any())).thenReturn(socket);
+
+    connection.connect(endpointManager, serverLocationExtension1, handshake, 4096, 0, 0,
+        CommunicationMode.ClientToServer, null, socketCreator, socketFactory);
+
+    verify(endpointManager, times(0)).referenceEndpoint(any(), any());
+
+  }
+
+
+  @Test
+  public void testConnectConnectionForNotPingTask() throws Exception {
+    ServerLocation serverLocation1 = new ServerLocation("localhost", 1);
+
+    ClientSideHandshake handshake = mock(ClientSideHandshake.class);
+    SocketCreator socketCreator = mock(SocketCreator.class);
+    SocketFactory socketFactory = mock(SocketFactory.class);
+
+    ServerQueueStatus status = mock(ServerQueueStatus.class);
+    when(handshake.handshakeWithServer(any(), any(), any())).thenReturn(status);
+
+    ClientSocketCreator clientSocketCreator = mock(ClientSocketCreator.class);
+    when(socketCreator.forClient()).thenReturn(clientSocketCreator);
+
+    Socket socket = mock(Socket.class);
+    when(clientSocketCreator.connect(any(), anyInt(), anyInt(), any())).thenReturn(socket);
+    when(endpointManager.referenceEndpoint(any(), any())).thenReturn(endPoint1);
+
+    connection.connect(endpointManager, serverLocation1, handshake, 4096, 0, 0,
+        CommunicationMode.ClientToServer, null, socketCreator, socketFactory);
+
+    verify(endpointManager, times(1)).referenceEndpoint(any(), any());
+
+  }
+
+  @Test
+  public void testConnectConnectionForPingTaskWhileEndpointIsClosed() throws Exception {
+    ServerLocationExtension serverLocationExtension1 =
+        new ServerLocationExtension(serverLocationAndMemberId1);
+
+    ClientSideHandshake handshake = mock(ClientSideHandshake.class);
+    SocketCreator socketCreator = mock(SocketCreator.class);
+    SocketFactory socketFactory = mock(SocketFactory.class);
+
+    ServerQueueStatus status = mock(ServerQueueStatus.class);
+    when(handshake.handshakeWithServer(any(), any(), any())).thenReturn(status);
+
+    ClientSocketCreator clientSocketCreator = mock(ClientSocketCreator.class);
+    when(socketCreator.forClient()).thenReturn(clientSocketCreator);
+
+    Socket socket = mock(Socket.class);
+    when(clientSocketCreator.connect(any(), anyInt(), anyInt(), any())).thenReturn(socket);
+    endPoint1.close();
+
+    when(endpointManager.referenceEndpoint(any(), any())).thenReturn(endPoint2);
+
+    connection.connect(endpointManager, serverLocationExtension1, handshake, 4096, 0, 0,
+        CommunicationMode.ClientToServer, null, socketCreator, socketFactory);
+
+    verify(endpointManager, times(1)).referenceEndpoint(any(), any());
+
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/ServerLocationExtensionTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/ServerLocationExtensionTest.java
new file mode 100644
index 0000000000..2ace7e02aa
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/ServerLocationExtensionTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+
+public class ServerLocationExtensionTest {
+
+  @Test
+  public void givenTwoObjectsWithSameHostAndPortAndId_whenCompared_thenAreEquals() {
+    final ServerLocation serverLocation1 = new ServerLocation("localhost", 1);
+    final String uniqueId1 = new InternalDistributedMember("localhost", 1).getUniqueId();
+
+    ServerLocationAndMemberId serverLocationAndMemberId1 =
+        new ServerLocationAndMemberId(serverLocation1, uniqueId1);
+    ServerLocationAndMemberId serverLocationAndMemberId2 =
+        new ServerLocationAndMemberId(serverLocation1, uniqueId1);
+
+    ServerLocationExtension serverLocationExtension1 =
+        new ServerLocationExtension(serverLocationAndMemberId1);
+    ServerLocationExtension serverLocationExtension2 =
+        new ServerLocationExtension(serverLocationAndMemberId2);
+
+    assertThat(serverLocationExtension1).isEqualTo(serverLocationExtension2);
+  }
+
+  @Test
+  public void givenTwoObjectsWithSameHostAndPortButDifferentViewId_whenCompared_thenAreEquals() {
+
+    final ServerLocation serverLocation1 = new ServerLocation("localhost", 1);
+    InternalDistributedMember idmWithView1 = new InternalDistributedMember("localhost", 1);
+    idmWithView1.setVmViewId(1);
+    InternalDistributedMember idmWithView2 = new InternalDistributedMember("localhost", 1);
+    idmWithView2.setVmViewId(2);
+
+    ServerLocationAndMemberId serverLocationAndMemberId1 =
+        new ServerLocationAndMemberId(serverLocation1, idmWithView1.getUniqueId());
+    ServerLocationAndMemberId serverLocationAndMemberId2 =
+        new ServerLocationAndMemberId(serverLocation1, idmWithView2.getUniqueId());
+
+    ServerLocationExtension serverLocationExtension1 =
+        new ServerLocationExtension(serverLocationAndMemberId1);
+    ServerLocationExtension serverLocationExtension2 =
+        new ServerLocationExtension(serverLocationAndMemberId2);
+
+    assertThat(serverLocationExtension1).isEqualTo(serverLocationExtension2);
+  }
+
+  @Test
+  public void givenTwoObjectsWithDifferentHostPortAndId_whenCompared_thenAreNotEquals() {
+    final ServerLocation serverLocation1 = new ServerLocation("localhost", 1);
+    final ServerLocation serverLocation2 = new ServerLocation("localhost", 2);
+    final String uniqueId1 = new InternalDistributedMember("localhost", 1).getUniqueId();
+    final String uniqueId2 = new InternalDistributedMember("localhost", 2).getUniqueId();
+
+    ServerLocationAndMemberId serverLocationAndMemberId1 =
+        new ServerLocationAndMemberId(serverLocation1, uniqueId1);
+    ServerLocationAndMemberId serverLocationAndMemberId2 =
+        new ServerLocationAndMemberId(serverLocation2, uniqueId2);
+
+    ServerLocationExtension serverLocationExtension1 =
+        new ServerLocationExtension(serverLocationAndMemberId1);
+    ServerLocationExtension serverLocationExtension2 =
+        new ServerLocationExtension(serverLocationAndMemberId2);
+
+    assertThat(serverLocationExtension1).isNotEqualTo(serverLocationExtension2);
+  }
+
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/ServerLocationTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/ServerLocationTest.java
new file mode 100644
index 0000000000..80f3bf4e18
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/ServerLocationTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.Test;
+
+public class ServerLocationTest {
+
+  @Test
+  public void givenTwoObjectsWithSameHostAndPortwhenCompared_thenAreEquals() {
+    final ServerLocation serverLocation1 = new ServerLocation("localhost", 1);
+    final ServerLocation serverLocation2 = new ServerLocation("localhost", 1);
+
+    assertThat(serverLocation1).isEqualTo(serverLocation2);
+  }
+
+
+  @Test
+  public void givenTwoObjectsWithSamePortAndDifferentPortwhenCompared_thenAreNotEquals() {
+    final ServerLocation serverLocation1 = new ServerLocation("localhost", 1);
+    final ServerLocation serverLocation2 = new ServerLocation("localhost", 2);
+
+    assertThat(serverLocation1).isNotEqualTo(serverLocation2);
+  }
+
+  @Test
+  public void objectCreatedWithCopyConstructorwhenComparedToOriginWithInitialRequiresCredentials_isEqual() {
+    final ServerLocation serverLocation1 = new ServerLocation("localhost", 1);
+    serverLocation1.setRequiresCredentials(false);
+    final ServerLocation serverLocation2 = new ServerLocation(serverLocation1);
+
+    assertThat(serverLocation1).isEqualTo(serverLocation2);
+    assertThat(serverLocation1.getRequiresCredentials())
+        .isEqualTo(serverLocation2.getRequiresCredentials());
+  }
+
+  @Test
+  public void objectCreatedWithCopyConstructorwhenComparedToOriginWithNotRequiresCredentials_isEqual() {
+    final ServerLocation serverLocation1 = new ServerLocation("localhost", 1);
+    serverLocation1.setRequiresCredentials(false);
+    final ServerLocation serverLocation2 = new ServerLocation(serverLocation1);
+
+    assertThat(serverLocation1).isEqualTo(serverLocation2);
+    assertThat(serverLocation1.getRequiresCredentials())
+        .isEqualTo(serverLocation2.getRequiresCredentials());
+  }
+
+  @Test
+  public void objectCreatedWithCopyConstructorwhenComparedToOriginWithRequiresCredentials_isEqual() {
+    final ServerLocation serverLocation1 = new ServerLocation("localhost", 1);
+    serverLocation1.setRequiresCredentials(true);
+    final ServerLocation serverLocation2 = new ServerLocation(serverLocation1);
+
+    assertThat(serverLocation1).isEqualTo(serverLocation2);
+    assertThat(serverLocation1.getRequiresCredentials())
+        .isEqualTo(serverLocation2.getRequiresCredentials());
+  }
+}