You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2018/03/21 19:39:48 UTC
[geode] branch develop updated: GEODE-4451: Changed sender startup
to retry when a remote security exception occurs
This is an automated email from the ASF dual-hosted git repository.
boglesby pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 8e8ec93 GEODE-4451: Changed sender startup to retry when a remote security exception occurs
8e8ec93 is described below
commit 8e8ec93d516b76d4fb559ab91c0a5762708d1789
Author: Barry Oglesby <bo...@users.noreply.github.com>
AuthorDate: Wed Mar 21 12:39:45 2018 -0700
GEODE-4451: Changed sender startup to retry when a remote security exception occurs
* GEODE-4451: Changed sender startup to retry when a remote security exception occurs
* GEODE-4451: Prevented sender from being created when members aren't all current version
* GEODE-4451: Apply spotless
* GEODE-4451: Refactored test to use ConfigurationProperties
---
...currentParallelGatewaySenderEventProcessor.java | 2 +-
.../wan/parallel/ParallelGatewaySenderQueue.java | 3 +-
...oncurrentSerialGatewaySenderEventProcessor.java | 2 +-
.../geode/internal/i18n/LocalizedStrings.java | 2 +-
.../cli/commands/CreateGatewaySenderCommand.java | 13 ++
.../management/internal/cli/i18n/CliStrings.java | 2 +
.../commands/CreateGatewaySenderCommandTest.java | 23 ++++
.../wan/GatewaySenderEventRemoteDispatcher.java | 105 ++++++++-------
.../cache/wan/WANRollingUpgradeDUnitTest.java | 150 ++++++++++++++++++---
.../geode/internal/cache/wan/WANTestBase.java | 46 ++-----
.../wan/misc/NewWanAuthenticationDUnitTest.java | 86 ++++++++----
.../internal/cache/wan/misc/WANSSLDUnitTest.java | 69 +++++++---
...rityManagerWithInvalidCredentials.security.json | 18 +++
13 files changed, 367 insertions(+), 154 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index c4b1546..54b7034 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -189,7 +189,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor
if (ex != null) {
throw new GatewaySenderException(
LocalizedStrings.Sender_COULD_NOT_START_GATEWAYSENDER_0_BECAUSE_OF_EXCEPTION_1
- .toLocalizedString(new Object[] {this.getId(), ex.getMessage()}),
+ .toLocalizedString(new Object[] {this.sender.getId(), ex.getMessage()}),
ex.getCause());
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 75ce63c..3aa8534 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1577,8 +1577,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
* Constructor : Creates and initializes the thread
*/
public BatchRemovalThread(InternalCache c, ParallelGatewaySenderQueue queue) {
- super("BatchRemovalThread");
- // TODO:REF: Name for this thread ?
+ super("BatchRemovalThread for GatewaySender_" + queue.sender.getId() + "_" + queue.index);
this.setDaemon(true);
this.cache = c;
this.parallelQueue = queue;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
index 6413e1c..e7beb07 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
@@ -221,7 +221,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor
if (ex != null) {
throw new GatewaySenderException(
LocalizedStrings.Sender_COULD_NOT_START_GATEWAYSENDER_0_BECAUSE_OF_EXCEPTION_1
- .toLocalizedString(new Object[] {this.getId(), ex.getMessage()}),
+ .toLocalizedString(new Object[] {this.sender.getId(), ex.getMessage()}),
ex.getCause());
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
index 905086a..a81d5a5 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
@@ -2122,7 +2122,7 @@ public class LocalizedStrings {
public static final StringId CacheClientProxy_EXCEPTION_OCCURRED_WHILE_TRYING_TO_CREATE_A_MESSAGE_QUEUE =
new StringId(2297, "Exception occurred while trying to create a message queue.");
public static final StringId GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1 =
- new StringId(2298, "{0} : Could not connect. {1}");
+ new StringId(2298, "{0} : Could not connect due to: {1}");
public static final StringId CacheCollector_UNABLE_TO_MIX_REGION_AND_ENTRY_SNAPSHOTS_IN_CACHECOLLECTOR =
new StringId(2300, "Unable to mix region and entry snapshots in CacheCollector.");
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java
index 8a86c09..b132852 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java
@@ -23,6 +23,8 @@ import org.springframework.shell.core.annotation.CliOption;
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.Version;
import org.apache.geode.management.cli.CliMetaData;
import org.apache.geode.management.cli.ConverterHint;
import org.apache.geode.management.cli.Result;
@@ -116,6 +118,12 @@ public class CreateGatewaySenderCommand extends GfshCommand {
Set<DistributedMember> membersToCreateGatewaySenderOn = getMembers(onGroups, onMember);
+ // Don't allow sender to be created if all members are not the current version.
+ if (!verifyAllCurrentVersion(membersToCreateGatewaySenderOn)) {
+ return ResultBuilder.createUserErrorResult(
+ CliStrings.CREATE_GATEWAYSENDER__MSG__CAN_NOT_CREATE_DIFFERENT_VERSIONS);
+ }
+
List<CliFunctionResult> gatewaySenderCreateResults =
executeAndGetFunctionResult(GatewaySenderCreateFunction.INSTANCE, gatewaySenderFunctionArgs,
membersToCreateGatewaySenderOn);
@@ -139,6 +147,11 @@ public class CreateGatewaySenderCommand extends GfshCommand {
return result;
}
+ private boolean verifyAllCurrentVersion(Set<DistributedMember> members) {
+ return members.stream().allMatch(
+ member -> ((InternalDistributedMember) member).getVersionObject().equals(Version.CURRENT));
+ }
+
public static class Interceptor extends AbstractCliAroundInterceptor {
@Override
public Result preExecution(GfshParseResult parseResult) {
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java
index a1e9e3e..e7237d4 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java
@@ -2248,6 +2248,8 @@ public class CliStrings {
"Could not instantiate class \"{0}\" specified for \"{1}\".";
public static final String CREATE_GATEWAYSENDER__MSG__COULD_NOT_ACCESS_CLASS_0_SPECIFIED_FOR_1 =
"Could not access class \"{0}\" specified for \"{1}\".";
+ public static final String CREATE_GATEWAYSENDER__MSG__CAN_NOT_CREATE_DIFFERENT_VERSIONS =
+ "Gateway Sender cannot be created until all members are the current version";
/* stop gateway-receiver */
public static final String START_GATEWAYSENDER = "start gateway-sender";
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java
index 122cf10..b24792f 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.verify;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -33,9 +34,13 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.InternalClusterConfigurationService;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
+import org.apache.geode.management.internal.cli.i18n.CliStrings;
import org.apache.geode.management.internal.configuration.domain.XmlEntity;
import org.apache.geode.test.junit.categories.UnitTest;
import org.apache.geode.test.junit.rules.GfshParserRule;
@@ -148,4 +153,22 @@ public class CreateGatewaySenderCommandTest {
.hasNoFailToPersistError();
verify(ccService, never()).deleteXmlEntity(any(), any());
}
+
+ @Test
+ public void whenMembersAreDifferentVersions() {
+ // Create a set of mixed version members
+ Set<DistributedMember> members = new HashSet<>();
+ InternalDistributedMember currentVersionMember = mock(InternalDistributedMember.class);
+ doReturn(Version.CURRENT).when(currentVersionMember).getVersionObject();
+ InternalDistributedMember oldVersionMember = mock(InternalDistributedMember.class);
+ doReturn(Version.GEODE_140).when(oldVersionMember).getVersionObject();
+ members.add(currentVersionMember);
+ members.add(oldVersionMember);
+ doReturn(members).when(command).getMembers(any(), any());
+
+ // Verify executing the command fails
+ gfsh.executeAndAssertThat(command,
+ "create gateway-sender --id=1 --remote-distributed-system-id=1").statusIsError()
+ .containsOutput(CliStrings.CREATE_GATEWAYSENDER__MSG__CAN_NOT_CREATE_DIFFERENT_VERSIONS);
+ }
}
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 782d7c0..8357669 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -78,10 +78,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
try {
initializeConnection();
} catch (GatewaySenderException e) {
- if (e.getCause() instanceof GemFireSecurityException) {
- throw e;
- }
-
+ // It is ok to ignore this exception. It is logged in the initializeConnection call.
}
}
@@ -168,7 +165,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
// if our pool is shutdown then just be silent
} else if (t instanceof IOException || t instanceof ServerConnectivityException
|| t instanceof ConnectionDestroyedException || t instanceof MessageTooLargeException
- || t instanceof IllegalStateException) {
+ || t instanceof IllegalStateException || t instanceof GemFireSecurityException) {
this.processor.handleException();
// If the cause is an IOException or a ServerException, sleep and retry.
// Sleep for a bit and recheck.
@@ -431,58 +428,29 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
}
}
} catch (ServerConnectivityException e) {
- this.failedConnectCount++;
- Throwable ex = null;
+ // Get the exception to throw
+ GatewaySenderException gse = getInitializeConnectionExceptionToThrow(e);
- if (e.getCause() instanceof GemFireSecurityException) {
- ex = e.getCause();
- if (logConnectionFailure()) {
- // only log this message once; another msg is logged once we connect
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1,
- new Object[] {this.processor.getSender().getId(), ex.getMessage()}));
- }
- throw new GatewaySenderException(ex);
- }
- List<ServerLocation> servers = this.sender.getProxy().getCurrentServers();
- String ioMsg = null;
- if (servers.size() == 0) {
- ioMsg = LocalizedStrings.GatewayEventRemoteDispatcher_THERE_ARE_NO_ACTIVE_SERVERS
- .toLocalizedString();
- } else {
- final StringBuilder buffer = new StringBuilder();
- for (ServerLocation server : servers) {
- String endpointName = String.valueOf(server);
- if (buffer.length() > 0) {
- buffer.append(", ");
- }
- buffer.append(endpointName);
- }
- ioMsg =
- LocalizedStrings.GatewayEventRemoteDispatcher_NO_AVAILABLE_CONNECTION_WAS_FOUND_BUT_THE_FOLLOWING_ACTIVE_SERVERS_EXIST_0
- .toLocalizedString(buffer.toString());
- }
- ex = new IOException(ioMsg);
- // Set the serverLocation to null so that a new connection can be
- // obtained in next attempt
+ // Set the serverLocation to null so that a new connection can be obtained in next attempt
this.sender.setServerLocation(null);
- if (this.failedConnectCount == 1) {
+
+ // Log the exception if necessary
+ if (logConnectionFailure()) {
// only log this message once; another msg is logged once we connect
logger.warn(LocalizedMessage.create(
- LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT,
- this.processor.getSender().getId()));
-
+ LocalizedStrings.GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1,
+ new Object[] {this.processor.getSender().getId(), gse.getCause().getMessage()}));
}
- // Wrap the IOException in a GatewayException so it can be processed the
- // same as the other exceptions that might occur in sendBatch.
- throw new GatewaySenderException(
- LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT
- .toLocalizedString(this.processor.getSender().getId()),
- ex);
+
+ // Increment failed connection count
+ this.failedConnectCount++;
+
+ // Throw the exception
+ throw gse;
}
if (this.failedConnectCount > 0) {
- Object[] logArgs = new Object[] {this.processor.getSender().getId(), con,
- Integer.valueOf(this.failedConnectCount)};
+ Object[] logArgs =
+ new Object[] {this.processor.getSender().getId(), con, this.failedConnectCount};
logger.info(LocalizedMessage.create(
LocalizedStrings.GatewayEventRemoteDispatcher_0_USING_1_AFTER_2_FAILED_CONNECT_ATTEMPTS,
logArgs));
@@ -496,14 +464,47 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
this.processor.checkIfPdxNeedsResend(this.connection.getQueueStatus().getPdxSize());
} catch (ConnectionDestroyedException e) {
throw new GatewaySenderException(
- LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT
- .toLocalizedString(this.processor.getSender().getId()),
+ LocalizedStrings.GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1.toLocalizedString(
+ new Object[] {this.processor.getSender().getId(), e.getMessage()}),
e);
} finally {
this.connectionLifeCycleLock.writeLock().unlock();
}
}
+ private GatewaySenderException getInitializeConnectionExceptionToThrow(
+ ServerConnectivityException e) {
+ GatewaySenderException gse = null;
+ if (e.getCause() instanceof GemFireSecurityException) {
+ gse = new GatewaySenderException(e.getCause());
+ } else {
+ List<ServerLocation> servers = this.sender.getProxy().getCurrentServers();
+ String ioMsg;
+ if (servers.size() == 0) {
+ ioMsg = LocalizedStrings.GatewayEventRemoteDispatcher_THERE_ARE_NO_ACTIVE_SERVERS
+ .toLocalizedString();
+ } else {
+ final StringBuilder buffer = new StringBuilder();
+ for (ServerLocation server : servers) {
+ String endpointName = String.valueOf(server);
+ if (buffer.length() > 0) {
+ buffer.append(", ");
+ }
+ buffer.append(endpointName);
+ }
+ ioMsg =
+ LocalizedStrings.GatewayEventRemoteDispatcher_NO_AVAILABLE_CONNECTION_WAS_FOUND_BUT_THE_FOLLOWING_ACTIVE_SERVERS_EXIST_0
+ .toLocalizedString(buffer.toString());
+ }
+ IOException ex = new IOException(ioMsg);
+ gse = new GatewaySenderException(
+ LocalizedStrings.GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1.toLocalizedString(
+ new Object[] {this.processor.getSender().getId(), ex.getMessage()}),
+ ex);
+ }
+ return gse;
+ }
+
protected boolean logConnectionFailure() {
// always log the first failure
if (logger.isDebugEnabled() || this.failedConnectCount == 0) {
diff --git a/geode-wan/src/test/java/org/apache/geode/cache/wan/WANRollingUpgradeDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/cache/wan/WANRollingUpgradeDUnitTest.java
index e0b7dfd..cf7c7b1 100644
--- a/geode-wan/src/test/java/org/apache/geode/cache/wan/WANRollingUpgradeDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/cache/wan/WANRollingUpgradeDUnitTest.java
@@ -14,19 +14,27 @@
*/
package org.apache.geode.cache.wan;
+import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_START;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.logging.log4j.Logger;
import org.awaitility.Awaitility;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -43,8 +51,6 @@ import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.Locator;
-import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.distributed.internal.DistributionConfigImpl;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.internal.AvailablePort;
import org.apache.geode.internal.AvailablePortHelper;
@@ -52,7 +58,8 @@ import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.parallel.BatchRemovalThreadHelper;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
-import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.management.internal.cli.i18n.CliStrings;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
@@ -64,6 +71,7 @@ import org.apache.geode.test.dunit.standalone.VersionManager;
import org.apache.geode.test.junit.categories.BackwardCompatibilityTest;
import org.apache.geode.test.junit.categories.DistributedTest;
import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
@SuppressWarnings("ConstantConditions")
@@ -85,6 +93,9 @@ public class WANRollingUpgradeDUnitTest extends JUnit4CacheTestCase {
// the old version of Geode we're testing against
private String oldVersion;
+ @Rule
+ public transient GfshCommandRule gfsh = new GfshCommandRule();
+
public WANRollingUpgradeDUnitTest(String version) {
oldVersion = version;
}
@@ -500,7 +511,7 @@ public class WANRollingUpgradeDUnitTest extends JUnit4CacheTestCase {
VM site1Server2 = host.getVM(oldVersion, 2);
VM site1Client = host.getVM(oldVersion, 3);
- // Get old site members
+ // Get current site members
VM site2Locator = host.getVM(VersionManager.CURRENT_VERSION, 4);
VM site2Server1 = host.getVM(VersionManager.CURRENT_VERSION, 5);
VM site2Server2 = host.getVM(VersionManager.CURRENT_VERSION, 6);
@@ -512,7 +523,7 @@ public class WANRollingUpgradeDUnitTest extends JUnit4CacheTestCase {
final String site1Locators = hostName + "[" + site1LocatorPort + "]";
final int site1DistributedSystemId = 0;
- // Get old site locator properties
+ // Get current site locator properties
final int site2LocatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
DistributedTestUtils.deleteLocatorStateFile(site2LocatorPort);
final String site2Locators = hostName + "[" + site2LocatorPort + "]";
@@ -530,7 +541,7 @@ public class WANRollingUpgradeDUnitTest extends JUnit4CacheTestCase {
!InternalLocator.getLocator().getConfig().getEnableClusterConfiguration()
|| InternalLocator.getLocator().isSharedConfigurationRunning())));
- // Start old site locator
+ // Start current site locator
site2Locator.invoke(() -> startLocator(site2LocatorPort, site2DistributedSystemId,
site2Locators, site1Locators));
@@ -548,12 +559,12 @@ public class WANRollingUpgradeDUnitTest extends JUnit4CacheTestCase {
rollStartAndConfigureServerToCurrent(site1Server2, site1Locators, site2DistributedSystemId,
regionName, site1SenderId, ParallelGatewaySenderQueue.DEFAULT_MESSAGE_SYNC_INTERVAL);
- // Start and configure old site servers
+ // Start and configure old current servers
String site2SenderId = getName() + "_gatewaysender_" + site1DistributedSystemId;
startAndConfigureServers(site2Server1, site2Server2, site2Locators, site1DistributedSystemId,
regionName, site2SenderId, ParallelGatewaySenderQueue.DEFAULT_MESSAGE_SYNC_INTERVAL);
- // Do puts from mixed site client and verify events on old site
+ // Do puts from mixed site client and verify events on current site
int numPuts = 100;
doClientPutsAndVerifyEvents(site1Client, site1Server1, site1Server2, site2Server1, site2Server2,
hostName, site1LocatorPort, regionName, numPuts, site1SenderId, false);
@@ -561,15 +572,31 @@ public class WANRollingUpgradeDUnitTest extends JUnit4CacheTestCase {
private void startLocator(int port, int distributedSystemId, String locators,
String remoteLocators) throws IOException {
- Properties props = new Properties();
- props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
- props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME,
- String.valueOf(distributedSystemId));
- props.setProperty(DistributionConfig.LOCATORS_NAME, locators);
- props.setProperty(DistributionConfig.REMOTE_LOCATORS_NAME, remoteLocators);
- props.setProperty(DistributionConfig.LOG_LEVEL_NAME, DUnitLauncher.logLevel);
- props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+ Properties props = getLocatorProperties(distributedSystemId, locators, remoteLocators);
+ Locator.startLocatorAndDS(port, null, props);
+ }
+
+ private int startLocatorWithJmxManager(int port, int distributedSystemId, String locators,
+ String remoteLocators) throws IOException {
+ Properties props = getLocatorProperties(distributedSystemId, locators, remoteLocators);
+ int jmxPort = AvailablePortHelper.getRandomAvailableTCPPort();
+ props.put(JMX_MANAGER_PORT, String.valueOf(jmxPort));
+ props.put(JMX_MANAGER, "true");
+ props.put(JMX_MANAGER_START, "true");
Locator.startLocatorAndDS(port, null, props);
+ return jmxPort;
+ }
+
+ private Properties getLocatorProperties(int distributedSystemId, String locators,
+ String remoteLocators) {
+ Properties props = new Properties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, String.valueOf(distributedSystemId));
+ props.setProperty(LOCATORS, locators);
+ props.setProperty(REMOTE_LOCATORS, remoteLocators);
+ props.setProperty(LOG_LEVEL, DUnitLauncher.logLevel);
+ props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
+ return props;
}
private void stopLocator() throws Exception {
@@ -707,11 +734,92 @@ public class WANRollingUpgradeDUnitTest extends JUnit4CacheTestCase {
}
}
+ @Test
+ public void testCreateGatewaySenderMixedSiteOneCurrentSiteTwo() throws Exception {
+ final Host host = Host.getHost(0);
+
+ // Get mixed site members
+ VM site1Locator = host.getVM(oldVersion, 0);
+ VM site1Server1 = host.getVM(oldVersion, 1);
+ VM site1Server2 = host.getVM(oldVersion, 2);
+
+ // Get current site members
+ VM site2Locator = host.getVM(VersionManager.CURRENT_VERSION, 4);
+ VM site2Server1 = host.getVM(VersionManager.CURRENT_VERSION, 5);
+ VM site2Server2 = host.getVM(VersionManager.CURRENT_VERSION, 6);
+
+ // Get mixed site locator properties
+ String hostName = NetworkUtils.getServerHostName(host);
+ final int site1LocatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ DistributedTestUtils.deleteLocatorStateFile(site1LocatorPort);
+ final String site1Locators = hostName + "[" + site1LocatorPort + "]";
+ final int site1DistributedSystemId = 0;
+
+ // Get current site locator properties
+ final int site2LocatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ DistributedTestUtils.deleteLocatorStateFile(site2LocatorPort);
+ final String site2Locators = hostName + "[" + site2LocatorPort + "]";
+ final int site2DistributedSystemId = 1;
+
+ // Start mixed site locator
+ site1Locator.invoke(() -> startLocator(site1LocatorPort, site1DistributedSystemId,
+ site1Locators, site2Locators));
+
+ // Locators before 1.4 handled configuration asynchronously.
+ // We must wait for configuration configuration to be ready, or confirm that it is disabled.
+ site1Locator.invoke(
+ () -> Awaitility.await().atMost(65, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS)
+ .until(() -> assertTrue(
+ !InternalLocator.getLocator().getConfig().getEnableClusterConfiguration()
+ || InternalLocator.getLocator().isSharedConfigurationRunning())));
+
+ // Start current site locator
+ site2Locator.invoke(() -> startLocator(site2LocatorPort, site2DistributedSystemId,
+ site2Locators, site1Locators));
+
+ // Start current site servers with receivers
+ site2Server1.invoke(() -> createCache(site2Locators));
+ site2Server1.invoke(() -> createGatewayReceiver());
+ site2Server2.invoke(() -> createCache(site2Locators));
+ site2Server2.invoke(() -> createGatewayReceiver());
+
+ // Start mixed site servers
+ site1Server1.invoke(() -> createCache(site1Locators));
+ site1Server2.invoke(() -> createCache(site1Locators));
+
+ // Roll mixed site locator to current with jmx manager
+ site1Locator.invoke(() -> stopLocator());
+ VM site1RolledLocator = host.getVM(VersionManager.CURRENT_VERSION, site1Locator.getId());
+ int jmxManagerPort =
+ site1RolledLocator.invoke(() -> startLocatorWithJmxManager(site1LocatorPort,
+ site1DistributedSystemId, site1Locators, site2Locators));
+
+ // Roll one mixed site server to current
+ site1Server2.invoke(() -> closeCache());
+ VM site1Server2RolledServer = host.getVM(VersionManager.CURRENT_VERSION, site1Server2.getId());
+ site1Server2RolledServer.invoke(() -> createCache(site1Locators));
+
+ // Use gfsh to attempt to create a gateway sender in the mixed site servers
+ this.gfsh.connectAndVerify(jmxManagerPort, GfshCommandRule.PortType.jmxManager);
+ this.gfsh
+ .executeAndAssertThat(getCreateGatewaySenderCommand("toSite2", site2DistributedSystemId))
+ .statusIsError()
+ .containsOutput(CliStrings.CREATE_GATEWAYSENDER__MSG__CAN_NOT_CREATE_DIFFERENT_VERSIONS);
+ }
+
+ private String getCreateGatewaySenderCommand(String id, int remoteDsId) {
+ CommandStringBuilder csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER);
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__ID, id);
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID,
+ String.valueOf(remoteDsId));
+ return csb.toString();
+ }
+
private void createCache(String locators) {
Properties props = new Properties();
- props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
- props.setProperty(DistributionConfig.LOCATORS_NAME, locators);
- props.setProperty(DistributionConfig.LOG_LEVEL_NAME, DUnitLauncher.logLevel);
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(LOCATORS, locators);
+ props.setProperty(LOG_LEVEL, DUnitLauncher.logLevel);
getCache(props);
}
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 993133c..226595b 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -36,6 +36,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.OFF_HEAP_MEMO
import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
import static org.apache.geode.test.dunit.Host.getHost;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -3295,54 +3296,35 @@ public class WANTestBase extends DistributedTestCase {
* @param senderId
*/
public static void verifySenderPausedState(String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = cache.getGatewaySender(senderId);
assertTrue(sender.isPaused());
}
public static void verifySenderResumedState(String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = cache.getGatewaySender(senderId);
assertFalse(sender.isPaused());
assertTrue(sender.isRunning());
}
public static void verifySenderStoppedState(String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = cache.getGatewaySender(senderId);
assertFalse(sender.isRunning());
}
public static void verifySenderRunningState(String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = cache.getGatewaySender(senderId);
assertTrue(sender.isRunning());
}
+ public static void verifySenderConnectedState(String senderId, boolean shouldBeConnected) {
+ AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
+ if (shouldBeConnected) {
+ assertThat(sender.getEventProcessor().getDispatcher().isConnectedToRemote()).isTrue();
+ } else {
+ assertThat(sender.getEventProcessor().getDispatcher().isConnectedToRemote()).isFalse();
+ }
+ }
+
public static void verifyPool(String senderId, boolean poolShouldExist,
int expectedPoolLocatorsSize) {
AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java
index 6d77fbb..4f18363 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java
@@ -223,15 +223,14 @@ public class NewWanAuthenticationDUnitTest extends WANTestBase {
() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, isOffHeap()));
logger.info("Created RR in vm3");
- try {
- vm2.invoke(() -> WANTestBase.startSender("ln"));
- fail(
- "Authentication Failed: While starting the sender, an exception should have been thrown");
- } catch (Exception e) {
- if (!(e.getCause().getCause() instanceof AuthenticationFailedException)) {
- fail("Authentication is not working as expected", e);
- }
- }
+ // Start sender
+ vm2.invoke(() -> WANTestBase.startSender("ln"));
+
+ // Verify the sender is started
+ vm2.invoke(() -> verifySenderRunningState("ln"));
+
+ // Verify the sender is not connected
+ vm2.invoke(() -> verifySenderConnectedState("ln", false));
}
/**
@@ -258,28 +257,61 @@ public class NewWanAuthenticationDUnitTest extends WANTestBase {
vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(props2, null, nyPort));
logger.info("Created secured cache in vm3");
- vm2.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
+ String senderId = "ln";
+ vm2.invoke(
+ () -> WANTestBase.createSender(senderId, 2, false, 100, 10, false, false, null, true));
logger.info("Created sender in vm2");
vm3.invoke(() -> createReceiverInSecuredCache());
logger.info("Created receiver in vm3");
- vm2.invoke(
- () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", "ln", isOffHeap()));
+ String regionName = getTestMethodName() + "_RR";
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, senderId, isOffHeap()));
logger.info("Created RR in vm2");
- vm3.invoke(
- () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, isOffHeap()));
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap()));
logger.info("Created RR in vm3");
- try {
- vm2.invoke(() -> WANTestBase.startSender("ln"));
- fail(
- "Authentication Failed: While starting the sender, an exception should have been thrown");
- } catch (Exception e) {
- if (!(e.getCause().getCause() instanceof AuthenticationFailedException)) {
- fail("Authentication is not working as expected", e);
- }
- }
+ // Start sender
+ vm2.invoke(() -> WANTestBase.startSender(senderId));
+
+ // Verify the sender is started
+ vm2.invoke(() -> verifySenderRunningState(senderId));
+
+ // Verify the sender is not connected
+ vm2.invoke(() -> verifySenderConnectedState(senderId, false));
+
+ // Do some puts in the sender
+ int numPuts = 10;
+ vm2.invoke(() -> WANTestBase.doPuts(regionName, numPuts));
+
+ // Verify the sender is still started
+ vm2.invoke(() -> verifySenderRunningState(senderId));
+
+ // Verify the sender is still not connected
+ vm2.invoke(() -> verifySenderConnectedState(senderId, false));
+
+ // Verify the sender queue size
+ vm2.invoke(() -> testQueueSize(senderId, numPuts));
+
+ // Stop the receiver
+ vm3.invoke(() -> closeCache());
+
+ // Restart the receiver with a SecurityManager that accepts the existing sender's username and
+ // password. The
+ // NewWanAuthenticationDUnitTest.testWanSecurityManagerWithInvalidCredentials.security.json.
+ // file contains the admin user definition that the SecurityManager will accept.
+ String securityJsonRersource = "org/apache/geode/internal/cache/wan/misc/"
+ + getClass().getSimpleName() + "." + getTestMethodName() + ".security.json";
+ Properties propsRestart = buildSecurityProperties("guest", "guest", securityJsonRersource);
+ vm3.invoke(() -> createSecuredCache(propsRestart, null, nyPort));
+ vm3.invoke(() -> createReplicatedRegion(regionName, null, isOffHeap()));
+ vm3.invoke(() -> createReceiverInSecuredCache());
+
+ // Wait for the queue to drain
+ vm2.invoke(() -> checkQueueSize(senderId, 0));
+
+ // Verify region size on receiver
+ vm3.invoke(() -> validateRegionSize(regionName, numPuts));
}
private static Properties buildProperties(String clientauthenticator, String clientAuthInit,
@@ -304,9 +336,15 @@ public class NewWanAuthenticationDUnitTest extends WANTestBase {
}
private static Properties buildSecurityProperties(String username, String password) {
+ return buildSecurityProperties(username, password,
+ "org/apache/geode/security/templates/security.json");
+ }
+
+ private static Properties buildSecurityProperties(String username, String password,
+ String securityJsonResource) {
Properties props = new Properties();
props.put(SECURITY_MANAGER, TestSecurityManager.class.getName());
- props.put("security-json", "org/apache/geode/security/templates/security.json");
+ props.put("security-json", securityJsonResource);
props.put(SECURITY_CLIENT_AUTH_INIT, UserPasswdAI.class.getName());
props.put("security-username", username);
props.put("security-password", password);
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANSSLDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANSSLDUnitTest.java
index 8f3d53f..a1976b1 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANSSLDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANSSLDUnitTest.java
@@ -21,13 +21,11 @@ import org.junit.experimental.categories.Category;
import org.apache.geode.cache.Region;
import org.apache.geode.internal.cache.wan.WANTestBase;
+import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.FlakyTest;
import org.apache.geode.test.junit.categories.WanTest;
@Category({DistributedTest.class, WanTest.class})
@@ -66,31 +64,58 @@ public class WANSSLDUnitTest extends WANTestBase {
IgnoredException.addIgnoredException("Unexpected IOException");
IgnoredException.addIgnoredException("SSL Error");
IgnoredException.addIgnoredException("Unrecognized SSL message");
- try {
- Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
- vm2.invoke(() -> WANTestBase.createReceiverWithSSL(nyPort));
+ Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
- vm4.invoke(() -> WANTestBase.createCache(lnPort));
+ vm2.invoke(() -> WANTestBase.createReceiverWithSSL(nyPort));
- vm4.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
+ vm4.invoke(() -> WANTestBase.createCache(lnPort));
- vm2.invoke(
- () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, isOffHeap()));
+ String senderId = "ln";
+ vm4.invoke(
+ () -> WANTestBase.createSender(senderId, 2, false, 100, 10, false, false, null, true));
- vm4.invoke(() -> WANTestBase.startSender("ln"));
+ String regionName = getTestMethodName() + "_RR";
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap()));
- vm4.invoke(
- () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", "ln", isOffHeap()));
+ vm4.invoke(() -> WANTestBase.startSender(senderId));
- vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
+ // Verify the sender is started
+ vm4.invoke(() -> verifySenderRunningState(senderId));
- vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 1000));
- fail("Expected exception as only Receiver is SSL enabled. Not Sender");
- } catch (Exception e) {
- assertTrue(e.getCause().getMessage().contains("Server expecting SSL connection"));
- }
+ // Verify the sender is not connected
+ vm4.invoke(() -> verifySenderConnectedState(senderId, false));
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap()));
+
+ // Do some puts in the sender
+ int numPuts = 10;
+ vm4.invoke(() -> WANTestBase.doPuts(regionName, numPuts));
+
+ // Verify the sender is still started
+ vm4.invoke(() -> verifySenderRunningState(senderId));
+
+ // Verify the sender is still not connected
+ vm4.invoke(() -> verifySenderConnectedState(senderId, false));
+
+ // Verify the sender queue size
+ vm4.invoke(() -> testQueueSize(senderId, numPuts));
+
+ // Stop the receiver
+ vm2.invoke(() -> closeCache());
+ vm2.invoke(() -> closeSocketCreatorFactory());
+
+ // Restart the receiver with SSL disabled
+ createCacheInVMs(nyPort, vm2);
+ vm2.invoke(() -> createReplicatedRegion(regionName, null, isOffHeap()));
+ vm2.invoke(() -> createReceiver());
+
+ // Wait for the queue to drain
+ vm4.invoke(() -> checkQueueSize(senderId, 0));
+
+ // Verify region size on receiver
+ vm2.invoke(() -> validateRegionSize(regionName, numPuts));
}
@Test
@@ -144,4 +169,8 @@ public class WANSSLDUnitTest extends WANTestBase {
}
return false;
}
+
+ private void closeSocketCreatorFactory() {
+ SocketCreatorFactory.close();
+ }
}
diff --git a/geode-wan/src/test/resources/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.testWanSecurityManagerWithInvalidCredentials.security.json b/geode-wan/src/test/resources/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.testWanSecurityManagerWithInvalidCredentials.security.json
new file mode 100644
index 0000000..8fa76d8
--- /dev/null
+++ b/geode-wan/src/test/resources/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.testWanSecurityManagerWithInvalidCredentials.security.json
@@ -0,0 +1,18 @@
+{
+ "roles": [
+ {
+ "name": "admin",
+ "operationsAllowed": [
+ "CLUSTER:MANAGE",
+ "DATA:MANAGE"
+ ]
+ }
+ ],
+ "users": [
+ {
+ "name": "admin",
+ "password": "wrongPswd",
+ "roles": ["admin"]
+ }
+ ]
+}
--
To stop receiving notification emails like this one, please contact
boglesby@apache.org.