You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2022/02/28 23:34:31 UTC

[geode] branch support/1.15 updated: GEODE-10063: Correctly set primary queue connection. (#7382)

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch support/1.15
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.15 by this push:
     new d607a91  GEODE-10063: Correctly set primary queue connection.  (#7382)
d607a91 is described below

commit d607a91bee8e0f940fdf524c8b9ddbb2926cc8a1
Author: Eric Shu <es...@pivotal.io>
AuthorDate: Fri Feb 25 09:57:18 2022 -0800

    GEODE-10063: Correctly set primary queue connection.  (#7382)
    
          *  When adding QueueConnection to connectionList, also checks if
             the connection has been destroyed by another thread to prevent
             a bad connection is being added to the list.
          *  Schedule RedundancySatisfierTask after remove connection so
             that bad connection can be detected.
          *  During recoveryPrimary in RedundancySatisfierTask also
             check if primary connection is destroyed. If so, connection
             from backups will be promoted to primary.
    
    (cherry picked from commit 45cbe7f8df39704899b0305729749dc1cc9ffe89)
---
 .../cache/client/internal/QueueManagerImpl.java    |  22 ++-
 .../client/internal/QueueManagerImplTest.java      | 192 +++++++++++++++++++++
 2 files changed, 206 insertions(+), 8 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
index 28485c7..4da3aa7 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
@@ -41,6 +41,7 @@ import org.apache.geode.GemFireConfigException;
 import org.apache.geode.GemFireException;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.annotations.Immutable;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.InterestResultPolicy;
 import org.apache.geode.cache.NoSubscriptionServersAvailableException;
 import org.apache.geode.cache.client.ServerConnectivityException;
@@ -355,7 +356,8 @@ public class QueueManagerImpl implements QueueManager {
     endpointCrashed(con.getEndpoint());
   }
 
-  private void endpointCrashed(Endpoint endpoint) {
+  @VisibleForTesting
+  void endpointCrashed(Endpoint endpoint) {
     QueueConnectionImpl deadConnection;
     // We must be synchronized while checking to see if we have a queue connection for the endpoint,
     // because when we need to prevent a race between adding a queue connection to the map
@@ -373,8 +375,8 @@ public class QueueManagerImpl implements QueueManager {
                   ? (deadConnection.getUpdater().isPrimary() ? "Primary" : "Redundant")
                   : "Queue",
                   endpoint});
-      scheduleRedundancySatisfierIfNeeded(0);
       deadConnection.internalDestroy();
+      scheduleRedundancySatisfierIfNeeded(0);
     } else {
       if (logger.isDebugEnabled()) {
         logger.debug("Ignoring crashed endpoint {} it does not have a queue.", endpoint);
@@ -724,7 +726,8 @@ public class QueueManagerImpl implements QueueManager {
     }
   }
 
-  private QueueConnectionImpl promoteBackupToPrimary(List<Connection> backups) {
+  @VisibleForTesting
+  QueueConnectionImpl promoteBackupToPrimary(List<Connection> backups) {
     QueueConnectionImpl primary = null;
     for (int i = 0; primary == null && i < backups.size(); i++) {
       QueueConnectionImpl lastConnection = (QueueConnectionImpl) backups.get(i);
@@ -844,12 +847,13 @@ public class QueueManagerImpl implements QueueManager {
    * First we try to make a backup server the primary, but if run out of backup servers we will try
    * to find a new server.
    */
-  private void recoverPrimary(Set<ServerLocation> excludedServers) {
+  @VisibleForTesting
+  void recoverPrimary(Set<ServerLocation> excludedServers) {
     if (pool.getPoolOrCacheCancelInProgress() != null) {
       return;
     }
     final boolean isDebugEnabled = logger.isDebugEnabled();
-    if (queueConnections.getPrimary() != null) {
+    if (queueConnections.getPrimary() != null && !queueConnections.getPrimary().isDestroyed()) {
       if (isDebugEnabled) {
         logger.debug("Primary recovery not needed");
       }
@@ -980,7 +984,8 @@ public class QueueManagerImpl implements QueueManager {
   // connection but CCU may died as endpoint closed....
   // so before putting connection need to see if something(crash) happen we should be able to
   // recover from it
-  private boolean addToConnectionList(QueueConnectionImpl connection, boolean isPrimary) {
+  @VisibleForTesting
+  boolean addToConnectionList(QueueConnectionImpl connection, boolean isPrimary) {
     boolean isBadConnection;
     synchronized (lock) {
       ClientUpdater cu = connection.getUpdater();
@@ -989,7 +994,7 @@ public class QueueManagerImpl implements QueueManager {
       }
       // now still CCU can died but then it will execute Checkendpoint with lock it will remove
       // connection connection and it will reschedule it.
-      if (connection.getEndpoint().isClosed() || shuttingDown
+      if (connection.getEndpoint().isClosed() || connection.isDestroyed() || shuttingDown
           || pool.getPoolOrCacheCancelInProgress() != null) {
         isBadConnection = true;
       } else {
@@ -1022,7 +1027,8 @@ public class QueueManagerImpl implements QueueManager {
     return !isBadConnection;
   }
 
-  private void scheduleRedundancySatisfierIfNeeded(long delay) {
+  @VisibleForTesting
+  void scheduleRedundancySatisfierIfNeeded(long delay) {
     if (shuttingDown) {
       return;
     }
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/QueueManagerImplTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/QueueManagerImplTest.java
new file mode 100644
index 0000000..264a834
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/QueueManagerImplTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.client.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+
+public class QueueManagerImplTest {
+  private final InternalPool pool = mock(InternalPool.class, RETURNS_DEEP_STUBS);
+  private final Endpoint endpoint = mock(Endpoint.class);
+  private final Endpoint backupEndpoint = mock(Endpoint.class);
+  private final QueueConnectionImpl primary = mock(QueueConnectionImpl.class);
+  private final QueueConnectionImpl backup = mock(QueueConnectionImpl.class);
+  private final ClientUpdater clientUpdater = mock(ClientUpdater.class);
+  private QueueManagerImpl queueManager;
+
+  @Before
+  public void setup() {
+    queueManager = new QueueManagerImpl(pool, null, null, null, 1, 1, null, null);
+    when(primary.getEndpoint()).thenReturn(endpoint);
+    when(primary.getUpdater()).thenReturn(clientUpdater);
+    when(primary.isDestroyed()).thenReturn(false);
+    when(clientUpdater.isAlive()).thenReturn(true);
+    when(clientUpdater.isProcessing()).thenReturn(true);
+    when(endpoint.isClosed()).thenReturn(false);
+    when(backup.getEndpoint()).thenReturn(backupEndpoint);
+    when(backup.getUpdater()).thenReturn(clientUpdater);
+    when(backupEndpoint.isClosed()).thenReturn(false);
+  }
+
+  @Test
+  public void addNoClientUpdaterConnectionToConnectionListReturnsFalse() {
+    when(primary.getUpdater()).thenReturn(null);
+
+    assertThat(queueManager.addToConnectionList(primary, true)).isFalse();
+    QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
+    assertThat(connectionList.getPrimary()).isNull();
+  }
+
+  @Test
+  public void addNotAliveClientUpdaterConnectionToConnectionListReturnsFalse() {
+    when(clientUpdater.isAlive()).thenReturn(false);
+
+    assertThat(queueManager.addToConnectionList(primary, true)).isFalse();
+    QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
+    assertThat(connectionList.getPrimary()).isNull();
+  }
+
+  @Test
+  public void addNotProcessingClientUpdaterConnectionToConnectionListReturnsFalse() {
+    when(clientUpdater.isProcessing()).thenReturn(false);
+
+    assertThat(queueManager.addToConnectionList(primary, true)).isFalse();
+    QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
+    assertThat(connectionList.getPrimary()).isNull();
+  }
+
+  @Test
+  public void addClosedEndpointConnectionToConnectionListReturnsFalse() throws Exception {
+    when(endpoint.isClosed()).thenReturn(true);
+
+    assertThat(queueManager.addToConnectionList(primary, true)).isFalse();
+    QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
+    assertThat(connectionList.getPrimary()).isNull();
+    verify(primary).internalClose(true);
+  }
+
+  @Test
+  public void addDestroyedConnectionToConnectionListReturnsFalse() throws Exception {
+    when(primary.isDestroyed()).thenReturn(true);
+
+    assertThat(queueManager.addToConnectionList(primary, true)).isFalse();
+    QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
+    assertThat(connectionList.getPrimary()).isNull();
+    verify(primary).internalClose(true);
+  }
+
+  @Test
+  public void addConnectionToConnectionListWhenCancelInProgressReturnsFalse() throws Exception {
+    when(pool.getPoolOrCacheCancelInProgress()).thenReturn("cache closed");
+
+    assertThat(queueManager.addToConnectionList(primary, true)).isFalse();
+    QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
+    assertThat(connectionList.getPrimary()).isNull();
+    verify(primary).internalClose(true);
+  }
+
+  @Test
+  public void addConnectionToConnectionListCanSetPrimary() throws Exception {
+    assertThat(queueManager.addToConnectionList(primary, true)).isTrue();
+    QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
+    assertThat(connectionList.getPrimary()).isEqualTo(primary);
+    verify(primary, never()).internalClose(true);
+  }
+
+  @Test
+  public void addConnectionToConnectionListCanAddBackups() throws Exception {
+    queueManager.addToConnectionList(primary, true);
+
+    assertThat(queueManager.addToConnectionList(backup, false)).isTrue();
+    QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
+    assertThat(connectionList.getPrimary()).isEqualTo(primary);
+    assertThat(connectionList.getBackups()).contains(backup);
+    verify(backup, never()).internalClose(true);
+  }
+
+  @Test
+  public void endpointCrashedScheduleRedundancySatisfierAfterConnectionDestroyed() {
+    addConnections();
+    QueueManagerImpl spy = spy(queueManager);
+    InOrder inOrder = inOrder(primary, spy);
+    doNothing().when(spy).scheduleRedundancySatisfierIfNeeded(0);
+
+    spy.endpointCrashed(endpoint);
+
+    inOrder.verify(primary).internalDestroy();
+    inOrder.verify(spy).scheduleRedundancySatisfierIfNeeded(0);
+    QueueManager.QueueConnections connectionList = spy.getAllConnectionsNoWait();
+    assertThat(connectionList.getPrimary()).isNull();
+  }
+
+  private void addConnections() {
+    queueManager.addToConnectionList(primary, true);
+    queueManager.addToConnectionList(backup, false);
+  }
+
+  @Test
+  public void recoverPrimaryDoesNotPromoteBackupToPrimaryIfPrimaryExists() {
+    addConnections();
+    QueueManagerImpl spy = spy(queueManager);
+
+    spy.recoverPrimary(null);
+
+    verify(spy, never()).promoteBackupToPrimary(anyList());
+  }
+
+  @Test
+  public void recoverPrimaryPromoteBackupToPrimaryIfNoPrimary() {
+    QueueManagerImpl spy = spy(queueManager);
+    spy.addToConnectionList(backup, false);
+    doReturn(backup).when(spy).promoteBackupToPrimary(anyList());
+
+    spy.recoverPrimary(null);
+
+    verify(spy).promoteBackupToPrimary(anyList());
+    verifyQueueConnectionsAfterRecoverPrimary(spy);
+  }
+
+  private void verifyQueueConnectionsAfterRecoverPrimary(QueueManagerImpl spy) {
+    QueueManager.QueueConnections connectionList = spy.getAllConnectionsNoWait();
+    assertThat(connectionList.getPrimary()).isEqualTo(backup);
+    assertThat(connectionList.getBackups()).isEmpty();
+  }
+
+  @Test
+  public void recoverPrimaryPromoteBackupToPrimaryIfPrimaryConnectionIsDestroyed() {
+    addConnections();
+    QueueManagerImpl spy = spy(queueManager);
+    doReturn(backup).when(spy).promoteBackupToPrimary(anyList());
+    when(primary.isDestroyed()).thenReturn(true);
+
+    spy.recoverPrimary(null);
+
+    verify(spy).promoteBackupToPrimary(anyList());
+    verifyQueueConnectionsAfterRecoverPrimary(spy);
+  }
+}