You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2022/02/28 23:34:31 UTC
[geode] branch support/1.15 updated: GEODE-10063: Correctly set primary queue connection. (#7382)
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch support/1.15
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.15 by this push:
new d607a91 GEODE-10063: Correctly set primary queue connection. (#7382)
d607a91 is described below
commit d607a91bee8e0f940fdf524c8b9ddbb2926cc8a1
Author: Eric Shu <es...@pivotal.io>
AuthorDate: Fri Feb 25 09:57:18 2022 -0800
GEODE-10063: Correctly set primary queue connection. (#7382)
* When adding QueueConnection to connectionList, also checks if
the connection has been destroyed by another thread to prevent
a bad connection is being added to the list.
* Schedule RedundancySatisfierTask after remove connection so
that bad connection can be detected.
* During recoveryPrimary in RedundancySatisfierTask also
check if primary connection is destroyed. If so, connection
from backups will be promoted to primary.
(cherry picked from commit 45cbe7f8df39704899b0305729749dc1cc9ffe89)
---
.../cache/client/internal/QueueManagerImpl.java | 22 ++-
.../client/internal/QueueManagerImplTest.java | 192 +++++++++++++++++++++
2 files changed, 206 insertions(+), 8 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
index 28485c7..4da3aa7 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
@@ -41,6 +41,7 @@ import org.apache.geode.GemFireConfigException;
import org.apache.geode.GemFireException;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.Immutable;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.NoSubscriptionServersAvailableException;
import org.apache.geode.cache.client.ServerConnectivityException;
@@ -355,7 +356,8 @@ public class QueueManagerImpl implements QueueManager {
endpointCrashed(con.getEndpoint());
}
- private void endpointCrashed(Endpoint endpoint) {
+ @VisibleForTesting
+ void endpointCrashed(Endpoint endpoint) {
QueueConnectionImpl deadConnection;
// We must be synchronized while checking to see if we have a queue connection for the endpoint,
// because when we need to prevent a race between adding a queue connection to the map
@@ -373,8 +375,8 @@ public class QueueManagerImpl implements QueueManager {
? (deadConnection.getUpdater().isPrimary() ? "Primary" : "Redundant")
: "Queue",
endpoint});
- scheduleRedundancySatisfierIfNeeded(0);
deadConnection.internalDestroy();
+ scheduleRedundancySatisfierIfNeeded(0);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Ignoring crashed endpoint {} it does not have a queue.", endpoint);
@@ -724,7 +726,8 @@ public class QueueManagerImpl implements QueueManager {
}
}
- private QueueConnectionImpl promoteBackupToPrimary(List<Connection> backups) {
+ @VisibleForTesting
+ QueueConnectionImpl promoteBackupToPrimary(List<Connection> backups) {
QueueConnectionImpl primary = null;
for (int i = 0; primary == null && i < backups.size(); i++) {
QueueConnectionImpl lastConnection = (QueueConnectionImpl) backups.get(i);
@@ -844,12 +847,13 @@ public class QueueManagerImpl implements QueueManager {
* First we try to make a backup server the primary, but if run out of backup servers we will try
* to find a new server.
*/
- private void recoverPrimary(Set<ServerLocation> excludedServers) {
+ @VisibleForTesting
+ void recoverPrimary(Set<ServerLocation> excludedServers) {
if (pool.getPoolOrCacheCancelInProgress() != null) {
return;
}
final boolean isDebugEnabled = logger.isDebugEnabled();
- if (queueConnections.getPrimary() != null) {
+ if (queueConnections.getPrimary() != null && !queueConnections.getPrimary().isDestroyed()) {
if (isDebugEnabled) {
logger.debug("Primary recovery not needed");
}
@@ -980,7 +984,8 @@ public class QueueManagerImpl implements QueueManager {
// connection but CCU may died as endpoint closed....
// so before putting connection need to see if something(crash) happen we should be able to
// recover from it
- private boolean addToConnectionList(QueueConnectionImpl connection, boolean isPrimary) {
+ @VisibleForTesting
+ boolean addToConnectionList(QueueConnectionImpl connection, boolean isPrimary) {
boolean isBadConnection;
synchronized (lock) {
ClientUpdater cu = connection.getUpdater();
@@ -989,7 +994,7 @@ public class QueueManagerImpl implements QueueManager {
}
// now still CCU can died but then it will execute Checkendpoint with lock it will remove
// connection connection and it will reschedule it.
- if (connection.getEndpoint().isClosed() || shuttingDown
+ if (connection.getEndpoint().isClosed() || connection.isDestroyed() || shuttingDown
|| pool.getPoolOrCacheCancelInProgress() != null) {
isBadConnection = true;
} else {
@@ -1022,7 +1027,8 @@ public class QueueManagerImpl implements QueueManager {
return !isBadConnection;
}
- private void scheduleRedundancySatisfierIfNeeded(long delay) {
+ @VisibleForTesting
+ void scheduleRedundancySatisfierIfNeeded(long delay) {
if (shuttingDown) {
return;
}
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/QueueManagerImplTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/QueueManagerImplTest.java
new file mode 100644
index 0000000..264a834
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/QueueManagerImplTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.client.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+
+public class QueueManagerImplTest {
+ private final InternalPool pool = mock(InternalPool.class, RETURNS_DEEP_STUBS);
+ private final Endpoint endpoint = mock(Endpoint.class);
+ private final Endpoint backupEndpoint = mock(Endpoint.class);
+ private final QueueConnectionImpl primary = mock(QueueConnectionImpl.class);
+ private final QueueConnectionImpl backup = mock(QueueConnectionImpl.class);
+ private final ClientUpdater clientUpdater = mock(ClientUpdater.class);
+ private QueueManagerImpl queueManager;
+
+ @Before
+ public void setup() {
+ queueManager = new QueueManagerImpl(pool, null, null, null, 1, 1, null, null);
+ when(primary.getEndpoint()).thenReturn(endpoint);
+ when(primary.getUpdater()).thenReturn(clientUpdater);
+ when(primary.isDestroyed()).thenReturn(false);
+ when(clientUpdater.isAlive()).thenReturn(true);
+ when(clientUpdater.isProcessing()).thenReturn(true);
+ when(endpoint.isClosed()).thenReturn(false);
+ when(backup.getEndpoint()).thenReturn(backupEndpoint);
+ when(backup.getUpdater()).thenReturn(clientUpdater);
+ when(backupEndpoint.isClosed()).thenReturn(false);
+ }
+
+ @Test
+ public void addNoClientUpdaterConnectionToConnectionListReturnsFalse() {
+ when(primary.getUpdater()).thenReturn(null);
+
+ assertThat(queueManager.addToConnectionList(primary, true)).isFalse();
+ QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
+ assertThat(connectionList.getPrimary()).isNull();
+ }
+
+ @Test
+ public void addNotAliveClientUpdaterConnectionToConnectionListReturnsFalse() {
+ when(clientUpdater.isAlive()).thenReturn(false);
+
+ assertThat(queueManager.addToConnectionList(primary, true)).isFalse();
+ QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
+ assertThat(connectionList.getPrimary()).isNull();
+ }
+
+ @Test
+ public void addNotProcessingClientUpdaterConnectionToConnectionListReturnsFalse() {
+ when(clientUpdater.isProcessing()).thenReturn(false);
+
+ assertThat(queueManager.addToConnectionList(primary, true)).isFalse();
+ QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
+ assertThat(connectionList.getPrimary()).isNull();
+ }
+
+ @Test
+ public void addClosedEndpointConnectionToConnectionListReturnsFalse() throws Exception {
+ when(endpoint.isClosed()).thenReturn(true);
+
+ assertThat(queueManager.addToConnectionList(primary, true)).isFalse();
+ QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
+ assertThat(connectionList.getPrimary()).isNull();
+ verify(primary).internalClose(true);
+ }
+
+ @Test
+ public void addDestroyedConnectionToConnectionListReturnsFalse() throws Exception {
+ when(primary.isDestroyed()).thenReturn(true);
+
+ assertThat(queueManager.addToConnectionList(primary, true)).isFalse();
+ QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
+ assertThat(connectionList.getPrimary()).isNull();
+ verify(primary).internalClose(true);
+ }
+
+ @Test
+ public void addConnectionToConnectionListWhenCancelInProgressReturnsFalse() throws Exception {
+ when(pool.getPoolOrCacheCancelInProgress()).thenReturn("cache closed");
+
+ assertThat(queueManager.addToConnectionList(primary, true)).isFalse();
+ QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
+ assertThat(connectionList.getPrimary()).isNull();
+ verify(primary).internalClose(true);
+ }
+
+ @Test
+ public void addConnectionToConnectionListCanSetPrimary() throws Exception {
+ assertThat(queueManager.addToConnectionList(primary, true)).isTrue();
+ QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
+ assertThat(connectionList.getPrimary()).isEqualTo(primary);
+ verify(primary, never()).internalClose(true);
+ }
+
+ @Test
+ public void addConnectionToConnectionListCanAddBackups() throws Exception {
+ queueManager.addToConnectionList(primary, true);
+
+ assertThat(queueManager.addToConnectionList(backup, false)).isTrue();
+ QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
+ assertThat(connectionList.getPrimary()).isEqualTo(primary);
+ assertThat(connectionList.getBackups()).contains(backup);
+ verify(backup, never()).internalClose(true);
+ }
+
+ @Test
+ public void endpointCrashedScheduleRedundancySatisfierAfterConnectionDestroyed() {
+ addConnections();
+ QueueManagerImpl spy = spy(queueManager);
+ InOrder inOrder = inOrder(primary, spy);
+ doNothing().when(spy).scheduleRedundancySatisfierIfNeeded(0);
+
+ spy.endpointCrashed(endpoint);
+
+ inOrder.verify(primary).internalDestroy();
+ inOrder.verify(spy).scheduleRedundancySatisfierIfNeeded(0);
+ QueueManager.QueueConnections connectionList = spy.getAllConnectionsNoWait();
+ assertThat(connectionList.getPrimary()).isNull();
+ }
+
+ private void addConnections() {
+ queueManager.addToConnectionList(primary, true);
+ queueManager.addToConnectionList(backup, false);
+ }
+
+ @Test
+ public void recoverPrimaryDoesNotPromoteBackupToPrimaryIfPrimaryExists() {
+ addConnections();
+ QueueManagerImpl spy = spy(queueManager);
+
+ spy.recoverPrimary(null);
+
+ verify(spy, never()).promoteBackupToPrimary(anyList());
+ }
+
+ @Test
+ public void recoverPrimaryPromoteBackupToPrimaryIfNoPrimary() {
+ QueueManagerImpl spy = spy(queueManager);
+ spy.addToConnectionList(backup, false);
+ doReturn(backup).when(spy).promoteBackupToPrimary(anyList());
+
+ spy.recoverPrimary(null);
+
+ verify(spy).promoteBackupToPrimary(anyList());
+ verifyQueueConnectionsAfterRecoverPrimary(spy);
+ }
+
+ private void verifyQueueConnectionsAfterRecoverPrimary(QueueManagerImpl spy) {
+ QueueManager.QueueConnections connectionList = spy.getAllConnectionsNoWait();
+ assertThat(connectionList.getPrimary()).isEqualTo(backup);
+ assertThat(connectionList.getBackups()).isEmpty();
+ }
+
+ @Test
+ public void recoverPrimaryPromoteBackupToPrimaryIfPrimaryConnectionIsDestroyed() {
+ addConnections();
+ QueueManagerImpl spy = spy(queueManager);
+ doReturn(backup).when(spy).promoteBackupToPrimary(anyList());
+ when(primary.isDestroyed()).thenReturn(true);
+
+ spy.recoverPrimary(null);
+
+ verify(spy).promoteBackupToPrimary(anyList());
+ verifyQueueConnectionsAfterRecoverPrimary(spy);
+ }
+}