You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2020/07/02 18:10:52 UTC
[geode] 16/29: GEODE-8294 - Integrated ModuleService into
geode-membership.
This is an automated email from the ASF dual-hosted git repository.
udo pushed a commit to branch feature/GEODE-8294
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 72a41bdb205ed36c43e02f740f887a6cba0dab92
Author: Patrick Johnson <pj...@pivotal.io>
AuthorDate: Wed Jun 24 12:39:46 2020 -0700
GEODE-8294 - Integrated ModuleService into geode-membership.
---
.../geode/services/module/ModuleService.java | 100 ++++---
...utedSystemBuilderForTestingIntegrationTest.java | 2 +-
...nalDistributedSystemBuilderIntegrationTest.java | 6 +-
.../InternalDistributedSystemJUnitTest.java | 4 +-
...DistributedSystemLockMemoryIntegrationTest.java | 10 +-
...alDistributedSystemSecurityIntegrationTest.java | 4 +-
.../internal/membership/MembershipJUnitTest.java | 4 +-
.../geode/cache/client/ClientCacheFactory.java | 2 +-
.../geode/distributed/DistributedSystem.java | 4 +-
.../distributed/internal/DistributionImpl.java | 3 +-
.../internal/InternalDistributedSystem.java | 5 +
.../distributed/internal/InternalLocator.java | 2 +-
...rnalDistributedSystemStatisticsManagerTest.java | 11 +-
geode-membership/build.gradle | 7 +
.../membership/gms/GMSMembershipJUnitTest.java | 3 +-
.../membership/gms/MembershipIntegrationTest.java | 3 +-
.../gms/fd/GMSHealthMonitorJUnitTest.java | 3 +-
.../locator/GMSLocatorRecoveryIntegrationTest.java | 3 +-
.../gms/membership/GMSJoinLeaveJUnitTest.java | 5 +-
.../gms/messenger/JGroupsMessengerJUnitTest.java | 5 +-
.../gms/messenger/StatRecorderJUnitTest.java | 5 +-
.../internal/membership/api/MembershipBuilder.java | 5 +-
.../internal/membership/gms/GMSMembership.java | 4 +-
.../membership/gms/MembershipBuilderImpl.java | 9 +-
.../internal/membership/gms/Services.java | 11 +-
.../membership/gms/fd/GMSHealthMonitor.java | 18 +-
.../membership/gms/interfaces/Service.java | 3 +-
.../membership/gms/membership/GMSJoinLeave.java | 220 +++++++--------
.../membership/gms/messenger/JGroupsMessenger.java | 311 +++++++++------------
.../module/impl/JBossModuleServiceImpl.java | 18 ++
30 files changed, 403 insertions(+), 387 deletions(-)
diff --git a/geode-common-services/src/main/java/org/apache/geode/services/module/ModuleService.java b/geode-common-services/src/main/java/org/apache/geode/services/module/ModuleService.java
index 29b7fd4..9505a64 100644
--- a/geode-common-services/src/main/java/org/apache/geode/services/module/ModuleService.java
+++ b/geode-common-services/src/main/java/org/apache/geode/services/module/ModuleService.java
@@ -15,6 +15,7 @@
package org.apache.geode.services.module;
+import java.io.InputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -34,51 +35,72 @@ import org.apache.geode.services.result.impl.Success;
@Experimental
public interface ModuleService {
- static ModuleService getDefaultModuleService() {
- return new ModuleService() {
- @Override
- public ModuleServiceResult<Boolean> loadModule(ModuleDescriptor moduleDescriptor) {
- return Failure.of("This features is not implemented for a default ModuleService");
+ /**
+ * Default instance of ModuleService using "standard" ClassLoader resolution.
+ */
+ ModuleService DEFAULT = new ModuleService() {
+ @Override
+ public ModuleServiceResult<Boolean> loadModule(ModuleDescriptor moduleDescriptor) {
+ return Failure.of("This features is not implemented for a default ModuleService");
+ }
+
+ @Override
+ public ModuleServiceResult<Boolean> registerModule(ModuleDescriptor moduleDescriptor) {
+ return Failure.of("This features is not implemented for a default ModuleService");
+ }
+
+ @Override
+ public ModuleServiceResult<Boolean> unloadModule(String moduleName) {
+ return Failure.of("This features is not implemented for a default ModuleService");
+ }
+
+ @Override
+ public <T> ModuleServiceResult<Set<T>> loadService(Class<T> service) {
+ Set<T> result1 = new HashSet<>();
+ try {
+ ServiceLoader.load(service).forEach(result1::add);
+ } catch (Exception e) {
+ return Failure.of(e.toString());
}
-
- @Override
- public ModuleServiceResult<Boolean> registerModule(ModuleDescriptor moduleDescriptor) {
- return Failure.of("This features is not implemented for a default ModuleService");
+ return Success.of(result1);
+ }
+
+ @Override
+ public ModuleServiceResult<Class<?>> loadClass(String className,
+ ModuleDescriptor moduleDescriptor) {
+ return Failure.of("This features is not implemented for a default ModuleService");
+ }
+
+ @Override
+ public ModuleServiceResult<List<Class<?>>> loadClass(String className) {
+ try {
+ return Success.of(Collections.singletonList(
+ this.getClass().getClassLoader().loadClass(className)));
+ } catch (ClassNotFoundException e) {
+ return Failure.of(e.toString());
}
-
- @Override
- public ModuleServiceResult<Boolean> unloadModule(String moduleName) {
- return Failure.of("This features is not implemented for a default ModuleService");
+ }
+
+ @Override
+ public ModuleServiceResult<List<InputStream>> findResourceAsStream(String resourceFile) {
+ InputStream inputStream = null;
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ if (contextClassLoader != null) {
+ inputStream = contextClassLoader.getResourceAsStream(resourceFile);
}
- @Override
- public <T> ModuleServiceResult<Set<T>> loadService(Class<T> service) {
- Set<T> result = new HashSet<>();
- try {
- ServiceLoader.load(service).forEach(result::add);
- } catch (Exception e) {
- return Failure.of(e.toString());
- }
- return Success.of(result);
+ if (inputStream == null) {
+ inputStream = getClass().getResourceAsStream(resourceFile);
}
-
- @Override
- public ModuleServiceResult<Class<?>> loadClass(String className,
- ModuleDescriptor moduleDescriptor) {
- return Failure.of("This features is not implemented for a default ModuleService");
+ if (inputStream == null) {
+ inputStream = ClassLoader.getSystemResourceAsStream(resourceFile);
}
- @Override
- public ModuleServiceResult<List<Class<?>>> loadClass(String className) {
- try {
- return Success.of(Collections.singletonList(
- this.getClass().getClassLoader().loadClass(className)));
- } catch (ClassNotFoundException e) {
- return Failure.of(e.toString());
- }
- }
- };
- }
+ return inputStream == null
+ ? Failure.of(String.format("No resource for path: %s could be found", resourceFile))
+ : Success.of(Collections.singletonList(inputStream));
+ }
+ };
/**
* Loads a module from a resource.
@@ -172,4 +194,6 @@ public interface ModuleService {
* failure.
*/
ModuleServiceResult<List<Class<?>>> loadClass(String className);
+
+ ModuleServiceResult<List<InputStream>> findResourceAsStream(String resourceFile);
}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/InternalDistributedSystemBuilderForTestingIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/InternalDistributedSystemBuilderForTestingIntegrationTest.java
index acbd405..183a48f 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/InternalDistributedSystemBuilderForTestingIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/InternalDistributedSystemBuilderForTestingIntegrationTest.java
@@ -46,7 +46,7 @@ public class InternalDistributedSystemBuilderForTestingIntegrationTest {
InternalDistributedSystem system =
new InternalDistributedSystem.BuilderForTesting(configProperties,
- ModuleService.getDefaultModuleService())
+ ModuleService.DEFAULT)
.setDistributionManager(distributionManager)
.setStatisticsManagerFactory(statisticsManagerFactory)
.build();
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/InternalDistributedSystemBuilderIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/InternalDistributedSystemBuilderIntegrationTest.java
index cfcd2f6..0a90db7 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/InternalDistributedSystemBuilderIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/InternalDistributedSystemBuilderIntegrationTest.java
@@ -54,8 +54,8 @@ public class InternalDistributedSystemBuilderIntegrationTest {
configProperties.setProperty(NAME, theName);
system =
- new InternalDistributedSystem.Builder(configProperties, metricsSessionBuilder, ModuleService
- .getDefaultModuleService())
+ new InternalDistributedSystem.Builder(configProperties, metricsSessionBuilder,
+ ModuleService.DEFAULT)
.build();
assertThat(system.isConnected()).isTrue();
@@ -71,7 +71,7 @@ public class InternalDistributedSystemBuilderIntegrationTest {
Properties configProperties = new Properties();
system = new InternalDistributedSystem.Builder(configProperties, metricsSessionBuilder,
- ModuleService.getDefaultModuleService())
+ ModuleService.DEFAULT)
.setSecurityConfig(securityConfig)
.build();
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/InternalDistributedSystemJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/InternalDistributedSystemJUnitTest.java
index c634a7a..fd0a26a 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/InternalDistributedSystemJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/InternalDistributedSystemJUnitTest.java
@@ -95,7 +95,7 @@ public class InternalDistributedSystemJUnitTest {
private InternalDistributedSystem createSystem(Properties props,
MetricsService.Builder metricsSessionBuilder) {
this.system = new InternalDistributedSystem.Builder(props, metricsSessionBuilder,
- ModuleService.getDefaultModuleService())
+ ModuleService.DEFAULT)
.build();
return this.system;
}
@@ -662,7 +662,7 @@ public class InternalDistributedSystemJUnitTest {
when(metricsSessionBuilder.build(any())).thenReturn(mock(MetricsService.class));
InternalDistributedSystem sys =
new InternalDistributedSystem.Builder(config1.toProperties(), metricsSessionBuilder,
- ModuleService.getDefaultModuleService())
+ ModuleService.DEFAULT)
.build();
try {
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/InternalDistributedSystemLockMemoryIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/InternalDistributedSystemLockMemoryIntegrationTest.java
index f1a0855..a2ca86e 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/InternalDistributedSystemLockMemoryIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/InternalDistributedSystemLockMemoryIntegrationTest.java
@@ -61,7 +61,7 @@ public class InternalDistributedSystemLockMemoryIntegrationTest {
public void lockMemoryAllowedIfAllowMemoryOverCommitIsSet() {
System.setProperty(GeodeGlossary.GEMFIRE_PREFIX + "Cache.ALLOW_MEMORY_OVERCOMMIT", "true");
system = spy(new InternalDistributedSystem.Builder(new Properties(), builder,
- ModuleService.getDefaultModuleService()).build());
+ ModuleService.DEFAULT).build());
doNothing().when(system).lockMemory();
system.lockMemory(100, 200);
@@ -74,7 +74,7 @@ public class InternalDistributedSystemLockMemoryIntegrationTest {
System.setProperty(
GeodeGlossary.GEMFIRE_PREFIX + "Cache.AVOID_MEMORY_LOCK_WHEN_OVERCOMMIT", "true");
system = spy(new InternalDistributedSystem.Builder(new Properties(), builder,
- ModuleService.getDefaultModuleService()).build());
+ ModuleService.DEFAULT).build());
system.lockMemory(100, 200);
@@ -87,7 +87,7 @@ public class InternalDistributedSystemLockMemoryIntegrationTest {
System.setProperty(
GeodeGlossary.GEMFIRE_PREFIX + "Cache.AVOID_MEMORY_LOCK_WHEN_OVERCOMMIT", "true");
system = spy(new InternalDistributedSystem.Builder(new Properties(), builder,
- ModuleService.getDefaultModuleService()).build());
+ ModuleService.DEFAULT).build());
system.lockMemory(100, 200);
@@ -98,7 +98,7 @@ public class InternalDistributedSystemLockMemoryIntegrationTest {
@Test
public void lockMemoryThrowsIfMemoryOverCommit() {
system = spy(new InternalDistributedSystem.Builder(new Properties(), builder,
- ModuleService.getDefaultModuleService()).build());
+ ModuleService.DEFAULT).build());
Throwable caughtException = catchThrowable(() -> system.lockMemory(100, 200));
@@ -109,7 +109,7 @@ public class InternalDistributedSystemLockMemoryIntegrationTest {
@Test
public void locksMemoryIfMemoryNotOverCommit() {
system = spy(new InternalDistributedSystem.Builder(new Properties(), builder,
- ModuleService.getDefaultModuleService()).build());
+ ModuleService.DEFAULT).build());
doNothing().when(system).lockMemory();
system.lockMemory(200, 100);
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/InternalDistributedSystemSecurityIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/InternalDistributedSystemSecurityIntegrationTest.java
index 4fb1130..be5e529 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/InternalDistributedSystemSecurityIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/InternalDistributedSystemSecurityIntegrationTest.java
@@ -59,7 +59,7 @@ public class InternalDistributedSystemSecurityIntegrationTest {
system =
InternalDistributedSystem.connectInternal(configProperties, securityConfig,
- metricsSessionBuilder, ModuleService.getDefaultModuleService());
+ metricsSessionBuilder, ModuleService.DEFAULT);
system.disconnect();
@@ -76,7 +76,7 @@ public class InternalDistributedSystemSecurityIntegrationTest {
system = InternalDistributedSystem.connectInternal(
configProperties, securityConfig, metricsSessionBuilder,
- ModuleService.getDefaultModuleService());
+ ModuleService.DEFAULT);
system.disconnect();
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/MembershipJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/MembershipJUnitTest.java
index cb3c19a..688fd10 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/MembershipJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/MembershipJUnitTest.java
@@ -78,6 +78,7 @@ import org.apache.geode.internal.security.SecurableCommunicationChannel;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.internal.security.SecurityServiceFactory;
import org.apache.geode.internal.serialization.DSFIDSerializer;
+import org.apache.geode.services.module.ModuleService;
@Category({MembershipJUnitTest.class})
public class MembershipJUnitTest {
@@ -294,7 +295,8 @@ public class MembershipJUnitTest {
mockSystem.getSecurityLogWriter(), mockSystem.getInternalLogWriter());
final Membership<InternalDistributedMember> m1 =
MembershipBuilder.<InternalDistributedMember>newMembershipBuilder(
- socketCreator, locatorClient, serializer, memberIdentifierFactory)
+ socketCreator, locatorClient, serializer, memberIdentifierFactory,
+ ModuleService.DEFAULT)
.setMembershipLocator(locator)
.setAuthenticator(authenticator)
.setStatistics(stats1)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/ClientCacheFactory.java b/geode-core/src/main/java/org/apache/geode/cache/client/ClientCacheFactory.java
index 8a1939d..11f9413 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/ClientCacheFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/ClientCacheFactory.java
@@ -281,7 +281,7 @@ public class ClientCacheFactory {
new InternalDistributedSystemMetricsService.Builder()
.setIsClient(true);
return InternalDistributedSystem.connectInternal(dsProps, null, metricsServiceBuilder,
- ModuleService.getDefaultModuleService());
+ ModuleService.DEFAULT);
}
private PoolFactory getPoolFactory() {
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/DistributedSystem.java b/geode-core/src/main/java/org/apache/geode/distributed/DistributedSystem.java
index 96b976d..95fc1ed 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/DistributedSystem.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/DistributedSystem.java
@@ -159,7 +159,7 @@ public abstract class DistributedSystem implements StatisticsFactory {
public static DistributedSystem connect(Properties config) {
return InternalDistributedSystem.connectInternal(config, null,
new InternalDistributedSystemMetricsService.Builder(),
- ModuleService.getDefaultModuleService());
+ ModuleService.DEFAULT);
}
protected static void addSystem(InternalDistributedSystem newSystem) {
@@ -671,4 +671,6 @@ public abstract class DistributedSystem implements StatisticsFactory {
* Returns the new DistributedSystem if there was an auto-reconnect
*/
public abstract DistributedSystem getReconnectedSystem();
+
+ public abstract ModuleService getModuleService();
}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
index d63391c..875ac6b 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
@@ -139,7 +139,8 @@ public class DistributionImpl implements Distribution {
socketCreator,
locatorClient,
InternalDataSerializer.getDSFIDSerializer(),
- new ClusterDistributionManager.ClusterDistributionManagerIDFactory())
+ new ClusterDistributionManager.ClusterDistributionManagerIDFactory(),
+ system.getModuleService())
.setMembershipLocator(locator)
.setAuthenticator(
new GMSAuthenticator(system.getSecurityProperties(), system.getSecurityService(),
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
index dbb5aba..171610c 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
@@ -2954,6 +2954,11 @@ public class InternalDistributedSystem extends DistributedSystem
}
@Override
+ public ModuleService getModuleService() {
+ return moduleService;
+ }
+
+ @Override
public void stopReconnecting() {
reconnectCancelled = true;
synchronized (reconnectLock) {
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
index f2eafc9..5f5260e 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
@@ -743,7 +743,7 @@ public class InternalLocator extends Locator implements ConnectListener, LogConf
InternalDistributedSystem
.connectInternal(distributedSystemProperties, null,
new InternalDistributedSystemMetricsService.Builder(),
- ModuleService.getDefaultModuleService(),
+ ModuleService.DEFAULT,
membershipLocator);
if (peerLocator) {
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/InternalDistributedSystemStatisticsManagerTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/InternalDistributedSystemStatisticsManagerTest.java
index 88a03f2..a6eab86 100644
--- a/geode-core/src/test/java/org/apache/geode/distributed/internal/InternalDistributedSystemStatisticsManagerTest.java
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/InternalDistributedSystemStatisticsManagerTest.java
@@ -69,11 +69,10 @@ public class InternalDistributedSystemStatisticsManagerTest {
when(statisticsManagerFactory.create(any(), anyLong(), anyBoolean()))
.thenReturn(statisticsManager);
internalDistributedSystem =
- new InternalDistributedSystem.BuilderForTesting(new Properties(), ModuleService
- .getDefaultModuleService())
- .setDistributionManager(distributionManager)
- .setStatisticsManagerFactory(statisticsManagerFactory)
- .build();
+ new InternalDistributedSystem.BuilderForTesting(new Properties(), ModuleService.DEFAULT)
+ .setDistributionManager(distributionManager)
+ .setStatisticsManagerFactory(statisticsManagerFactory)
+ .build();
}
@Test
@@ -88,7 +87,7 @@ public class InternalDistributedSystemStatisticsManagerTest {
InternalDistributedSystem result =
new InternalDistributedSystem.BuilderForTesting(new Properties(),
- ModuleService.getDefaultModuleService())
+ ModuleService.DEFAULT)
.setDistributionManager(distributionManager)
.setStatisticsManagerFactory(statisticsManagerFactory)
.build();
diff --git a/geode-membership/build.gradle b/geode-membership/build.gradle
index 6f4e737..f10a5ed 100644
--- a/geode-membership/build.gradle
+++ b/geode-membership/build.gradle
@@ -26,6 +26,13 @@ dependencies {
// Geode-common has annotations and other pieces used by geode-logging
api(project(':geode-common'))
+
+ implementation(project(':geode-common-services'))
+ testImplementation(project(':geode-common-services'))
+ integrationTestImplementation(project(':geode-common-services'))
+ distributedTestImplementation(project(':geode-common-services'))
+ upgradeTestImplementation(project(':geode-common-services'))
+
implementation(project(':geode-logging'))
implementation('org.apache.logging.log4j:log4j-api')
implementation('org.jgroups:jgroups')
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java
index 49b8b3a..f958bc4 100644
--- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java
@@ -60,6 +60,7 @@ import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave
import org.apache.geode.distributed.internal.membership.gms.interfaces.Messenger;
import org.apache.geode.internal.serialization.DSFIDSerializer;
import org.apache.geode.internal.serialization.internal.DSFIDSerializerImpl;
+import org.apache.geode.services.module.ModuleService;
import org.apache.geode.test.junit.categories.MembershipTest;
@Category({MembershipTest.class})
@@ -149,7 +150,7 @@ public class GMSMembershipJUnitTest {
messageListener = mock(MessageListener.class);
directChannelCallback = mock(LifecycleListener.class);
manager = new GMSMembership(listener, messageListener, directChannelCallback);
- manager.getGMSManager().init(services);
+ manager.getGMSManager().init(services, ModuleService.DEFAULT);
when(services.getManager()).thenReturn(manager.getGMSManager());
DSFIDSerializer serializer = new DSFIDSerializerImpl();
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipIntegrationTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipIntegrationTest.java
index c875a52..e8569cd 100644
--- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipIntegrationTest.java
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipIntegrationTest.java
@@ -54,6 +54,7 @@ import org.apache.geode.internal.serialization.DSFIDSerializer;
import org.apache.geode.internal.serialization.internal.DSFIDSerializerImpl;
import org.apache.geode.logging.internal.executors.LoggingExecutors;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.services.module.ModuleService;
/**
* Tests of using the membership APIs to make multiple Membership systems that communicate
@@ -299,7 +300,7 @@ public class MembershipIntegrationTest {
dsfidSerializer.getObjectDeserializer(), TcpSocketFactory.DEFAULT);
return MembershipBuilder.<MemberIdentifier>newMembershipBuilder(
- socketCreator, locatorClient, dsfidSerializer, memberIdFactory)
+ socketCreator, locatorClient, dsfidSerializer, memberIdFactory, ModuleService.DEFAULT)
.setMembershipLocator(embeddedLocator)
.setConfig(config)
.create();
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index f734e81..3d21026 100644
--- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -82,6 +82,7 @@ import org.apache.geode.distributed.internal.tcpserver.TcpSocketCreatorImpl;
import org.apache.geode.internal.serialization.BufferDataOutputStream;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.internal.serialization.internal.DSFIDSerializerImpl;
+import org.apache.geode.services.module.ModuleService;
import org.apache.geode.test.junit.categories.MembershipTest;
@Category({MembershipTest.class})
@@ -144,7 +145,7 @@ public class GMSHealthMonitorJUnitTest {
when(joinLeave.getMemberID()).thenReturn(mockMembers.get(myAddressIndex));
when(messenger.getMemberID()).thenReturn(mockMembers.get(myAddressIndex));
gmsHealthMonitor = new GMSHealthMonitorTest();
- gmsHealthMonitor.init(services);
+ gmsHealthMonitor.init(services, ModuleService.DEFAULT);
gmsHealthMonitor.start();
}
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorRecoveryIntegrationTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorRecoveryIntegrationTest.java
index 16b09e6..6d98054 100644
--- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorRecoveryIntegrationTest.java
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorRecoveryIntegrationTest.java
@@ -58,6 +58,7 @@ import org.apache.geode.internal.serialization.DSFIDSerializer;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.internal.serialization.internal.DSFIDSerializerImpl;
import org.apache.geode.logging.internal.executors.LoggingExecutors;
+import org.apache.geode.services.module.ModuleService;
import org.apache.geode.test.junit.categories.MembershipTest;
@Category(MembershipTest.class)
@@ -173,7 +174,7 @@ public class GMSLocatorRecoveryIntegrationTest {
};
final Membership<MemberIdentifierImpl> membership =
MembershipBuilder.newMembershipBuilder(socketCreator, locatorClient, serializer,
- new MemberIdentifierFactoryImpl()).setConfig(membershipConfig)
+ new MemberIdentifierFactoryImpl(), ModuleService.DEFAULT).setConfig(membershipConfig)
.setMembershipLocator(locator)
.create();
membership.start();
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
index c65f8ef..14ae0ec 100644
--- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
@@ -84,6 +84,7 @@ import org.apache.geode.distributed.internal.membership.gms.util.MemberIdentifie
import org.apache.geode.distributed.internal.tcpserver.HostAndPort;
import org.apache.geode.distributed.internal.tcpserver.TcpClient;
import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.services.module.ModuleService;
import org.apache.geode.test.junit.categories.MembershipTest;
@Category({MembershipTest.class})
@@ -174,7 +175,7 @@ public class GMSJoinLeaveJUnitTest {
} else {
gmsJoinLeave = new GMSJoinLeave(locatorClient);
}
- gmsJoinLeave.init(services);
+ gmsJoinLeave.init(services, ModuleService.DEFAULT);
gmsJoinLeave.start();
gmsJoinLeave.started();
gmsJoinLeave.setLocalAddress(gmsJoinLeaveMemberId);
@@ -1562,7 +1563,7 @@ public class GMSJoinLeaveJUnitTest {
GMSJoinLeave joinLeave = new GMSJoinLeave(null);
try {
- joinLeave.init(services);
+ joinLeave.init(services, ModuleService.DEFAULT);
throw new Error(
"expected a GemFireConfigException to be thrown because no locators are configured");
} catch (MembershipConfigurationException e) {
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index fe52eb8..766edb6 100755
--- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -98,6 +98,7 @@ import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.serialization.StaticSerialization;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.internal.serialization.internal.DSFIDSerializerImpl;
+import org.apache.geode.services.module.ModuleService;
import org.apache.geode.test.junit.categories.MembershipTest;
@Category({MembershipTest.class})
@@ -193,7 +194,7 @@ public class JGroupsMessengerJUnitTest {
when(services.getStatistics()).thenReturn(new DefaultMembershipStatistics());
messenger = new JGroupsMessenger<>();
- messenger.init(services);
+ messenger.init(services, ModuleService.DEFAULT);
// if I do this earlier then test this return messenger as null
when(services.getMessenger()).thenReturn(messenger);
@@ -906,7 +907,7 @@ public class JGroupsMessengerJUnitTest {
new MembershipInformationImpl(messenger.myChannel,
new ConcurrentLinkedQueue<>(), null);
JGroupsMessenger newMessenger = new JGroupsMessenger();
- newMessenger.init(services);
+ newMessenger.init(services, ModuleService.DEFAULT);
newMessenger.start();
newMessenger.started();
newMessenger.stop();
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/StatRecorderJUnitTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/StatRecorderJUnitTest.java
index 2af832c..c9301c6 100755
--- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/StatRecorderJUnitTest.java
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/StatRecorderJUnitTest.java
@@ -38,6 +38,7 @@ import org.apache.geode.distributed.internal.membership.api.MembershipConfig;
import org.apache.geode.distributed.internal.membership.api.MembershipStatistics;
import org.apache.geode.distributed.internal.membership.gms.Services;
import org.apache.geode.distributed.internal.membership.gms.interfaces.Manager;
+import org.apache.geode.services.module.ModuleService;
import org.apache.geode.test.junit.categories.MembershipTest;
/**
@@ -167,7 +168,7 @@ public class StatRecorderJUnitTest {
JGroupsMessenger messenger = new JGroupsMessenger();
- messenger.init(mockServices);
+ messenger.init(mockServices, ModuleService.DEFAULT);
String jgroupsConfig = messenger.jgStackConfig;
System.out.println(jgroupsConfig);
assertTrue(jgroupsConfig.contains("gms.messenger.StatRecorder"));
@@ -178,7 +179,7 @@ public class StatRecorderJUnitTest {
messenger = new JGroupsMessenger();
- messenger.init(mockServices);
+ messenger.init(mockServices, ModuleService.DEFAULT);
assertTrue(jgroupsConfig.contains("gms.messenger.StatRecorder"));
}
}
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/MembershipBuilder.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/MembershipBuilder.java
index 220a0d0..816376d 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/MembershipBuilder.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/MembershipBuilder.java
@@ -19,6 +19,7 @@ import org.apache.geode.distributed.internal.membership.gms.MembershipBuilderImp
import org.apache.geode.distributed.internal.tcpserver.TcpClient;
import org.apache.geode.distributed.internal.tcpserver.TcpSocketCreator;
import org.apache.geode.internal.serialization.DSFIDSerializer;
+import org.apache.geode.services.module.ModuleService;
/**
* A builder for creating a new {@link Membership}. Use this builder to configure a
@@ -46,8 +47,8 @@ public interface MembershipBuilder<ID extends MemberIdentifier> {
final TcpSocketCreator socketCreator,
final TcpClient locatorClient,
final DSFIDSerializer serializer,
- final MemberIdentifierFactory<ID> memberFactory) {
+ final MemberIdentifierFactory<ID> memberFactory, ModuleService moduleService) {
return new MembershipBuilderImpl<>(
- socketCreator, locatorClient, serializer, memberFactory);
+ socketCreator, locatorClient, serializer, memberFactory, moduleService);
}
}
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java
index 8707467..9cb9519 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java
@@ -63,6 +63,7 @@ import org.apache.geode.distributed.internal.membership.gms.interfaces.Manager;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.logging.internal.executors.LoggingExecutors;
import org.apache.geode.logging.internal.executors.LoggingThread;
+import org.apache.geode.services.module.ModuleService;
import org.apache.geode.util.internal.GeodeGlossary;
/**
@@ -1866,7 +1867,8 @@ public class GMSMembership<ID extends MemberIdentifier> implements Membership<ID
@Override
/* Service interface */
- public void init(Services<ID> services) throws MembershipConfigurationException {
+ public void init(Services<ID> services, ModuleService moduleService)
+ throws MembershipConfigurationException {
GMSMembership.this.services = services;
MembershipConfig config = services.getConfig();
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/MembershipBuilderImpl.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/MembershipBuilderImpl.java
index 3cb692c..7ad7a1a 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/MembershipBuilderImpl.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/MembershipBuilderImpl.java
@@ -31,6 +31,7 @@ import org.apache.geode.distributed.internal.membership.gms.locator.MembershipLo
import org.apache.geode.distributed.internal.tcpserver.TcpClient;
import org.apache.geode.distributed.internal.tcpserver.TcpSocketCreator;
import org.apache.geode.internal.serialization.DSFIDSerializer;
+import org.apache.geode.services.module.ModuleService;
/**
* MembershipBuilderImpl is the implementation of MembershipBuilder. It can construct
@@ -51,15 +52,19 @@ public class MembershipBuilderImpl<ID extends MemberIdentifier> implements Membe
private MembershipLocatorImpl<ID> membershipLocator;
+ private ModuleService moduleService;
+
public MembershipBuilderImpl(
final TcpSocketCreator socketCreator,
final TcpClient locatorClient,
final DSFIDSerializer serializer,
- final MemberIdentifierFactory<ID> memberFactory) {
+ final MemberIdentifierFactory<ID> memberFactory,
+ final ModuleService moduleService) {
this.socketCreator = socketCreator;
this.locatorClient = locatorClient;
this.serializer = serializer;
this.memberFactory = memberFactory;
+ this.moduleService = moduleService;
}
@Override
@@ -116,7 +121,7 @@ public class MembershipBuilderImpl<ID extends MemberIdentifier> implements Membe
if (membershipLocator != null) {
services.setLocators(membershipLocator.getGMSLocator(), membershipLocator);
}
- services.init();
+ services.init(moduleService);
return gmsMembership;
}
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java
index 4ed15e3..b853c50 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java
@@ -75,6 +75,7 @@ import org.apache.geode.distributed.internal.tcpserver.TcpClient;
import org.apache.geode.distributed.internal.tcpserver.TcpSocketCreator;
import org.apache.geode.internal.serialization.DSFIDSerializer;
import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.services.module.ModuleService;
/**
* Services holds all of the membership services of a GMSMembership. It serves as a
@@ -184,11 +185,11 @@ public class Services<ID extends MemberIdentifier> {
/**
* Initialize services - do this before invoking start()
*/
- public void init() throws MembershipConfigurationException {
- this.messenger.init(this);
- this.manager.init(this);
- this.joinLeave.init(this);
- this.healthMon.init(this);
+ public void init(ModuleService moduleService) throws MembershipConfigurationException {
+ this.messenger.init(this, moduleService);
+ this.manager.init(this, moduleService);
+ this.joinLeave.init(this, moduleService);
+ this.healthMon.init(this, moduleService);
}
/**
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index bd95e57..3a9a43b 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -73,6 +73,7 @@ import org.apache.geode.distributed.internal.tcpserver.TcpSocketCreator;
import org.apache.geode.internal.lang.JavaWorkarounds;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.logging.internal.executors.LoggingExecutors;
+import org.apache.geode.services.module.ModuleService;
import org.apache.geode.util.internal.GeodeGlossary;
/**
@@ -918,19 +919,20 @@ public class GMSHealthMonitor<ID extends MemberIdentifier> implements HealthMoni
}
@Override
- public void init(Services<ID> s) throws MembershipConfigurationException {
+ public void init(Services<ID> service, ModuleService moduleService)
+ throws MembershipConfigurationException {
isStopping = false;
- services = s;
- memberTimeout = s.getConfig().getMemberTimeout();
- this.stats = services.getStatistics();
+ this.services = service;
+ memberTimeout = service.getConfig().getMemberTimeout();
+ this.stats = this.services.getStatistics();
- services.getMessenger().addHandler(HeartbeatRequestMessage.class,
+ this.services.getMessenger().addHandler(HeartbeatRequestMessage.class,
this::processMessage);
- services.getMessenger().addHandler(HeartbeatMessage.class,
+ this.services.getMessenger().addHandler(HeartbeatMessage.class,
this::processMessage);
- services.getMessenger().addHandler(SuspectMembersMessage.class,
+ this.services.getMessenger().addHandler(SuspectMembersMessage.class,
this::processMessage);
- services.getMessenger().addHandler(FinalCheckPassedMessage.class,
+ this.services.getMessenger().addHandler(FinalCheckPassedMessage.class,
this::processMessage);
}
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/interfaces/Service.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/interfaces/Service.java
index 4e6265c..4baa142 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/interfaces/Service.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/interfaces/Service.java
@@ -19,13 +19,14 @@ import org.apache.geode.distributed.internal.membership.api.MemberStartupExcepti
import org.apache.geode.distributed.internal.membership.api.MembershipConfigurationException;
import org.apache.geode.distributed.internal.membership.gms.GMSMembershipView;
import org.apache.geode.distributed.internal.membership.gms.Services;
+import org.apache.geode.services.module.ModuleService;
/**
* Membership services in GMS all implement this interface
*
*/
public interface Service<ID extends MemberIdentifier> {
- void init(Services<ID> s) throws MembershipConfigurationException;
+ void init(Services<ID> s, ModuleService moduleService) throws MembershipConfigurationException;
/**
* called after all services have been initialized with init() and all services are available via
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 2aea358..ed7b3cb 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -66,6 +66,7 @@ import org.apache.geode.distributed.internal.tcpserver.TcpClient;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.logging.internal.executors.LoggingExecutors;
import org.apache.geode.logging.internal.executors.LoggingThread;
+import org.apache.geode.services.module.ModuleService;
import org.apache.geode.util.internal.GeodeGlossary;
/**
@@ -124,178 +125,116 @@ public class GMSJoinLeave<ID extends MemberIdentifier> implements JoinLeave<ID>
private static final Logger logger = Services.getLogger();
private static final boolean ALLOW_OLD_VERSION_FOR_TESTING = Boolean
.getBoolean(GeodeGlossary.GEMFIRE_PREFIX + "allow_old_members_to_join_for_testing");
-
/**
- * the view ID where I entered into membership
+ * state of collected artifacts during discovery
*/
- private int birthViewId;
-
+ final SearchState<ID> searchState = new SearchState<>();
/**
- * my address
+ * a synch object that guards view installation
*/
- private ID localAddress;
-
- private Services<ID> services;
-
+ private final Object viewInstallationLock = new Object();
/**
- * have I connected to the distributed system?
+ * members who we have been declared dead in the current view
*/
- private volatile boolean isJoined;
-
+ private final Set<ID> removedMembers = new HashSet<>();
+ /**
+ * members who we've received a leave message from
+ **/
+ private final Set<ID> leftMembers = new HashSet<>();
+ /**
+ * a list of join/leave/crashes
+ */
+ private final List<AbstractGMSMessage<ID>> viewRequests = new LinkedList<>();
+ /**
+ * collects the response to a join request
+ */
+ private final JoinResponseMessage<ID>[] joinResponse = new JoinResponseMessage[1];
+ /**
+ * for messaging locator
+ */
+ private final TcpClient locatorClient;
/**
* guarded by viewInstallationLock
*/
@VisibleForTesting
volatile boolean isCoordinator;
-
- /**
- * a synch object that guards view installation
- */
- private final Object viewInstallationLock = new Object();
-
/**
* the currently installed view. Guarded by viewInstallationLock
*/
@VisibleForTesting
volatile GMSMembershipView<ID> currentView;
-
/**
- * the previous view
- **/
- private volatile GMSMembershipView<ID> previousView;
-
+ * the established request collection jitter. This can be overridden for testing with
+ * delayViewCreationForTest
+ */
+ long requestCollectionInterval = MEMBER_REQUEST_COLLECTION_INTERVAL;
/**
- * members who we have been declared dead in the current view
+ * collects responses to new views
*/
- private final Set<ID> removedMembers = new HashSet<>();
-
+ ViewReplyProcessor viewProcessor = new ViewReplyProcessor(false);
/**
- * members who we've received a leave message from
- **/
- private final Set<ID> leftMembers = new HashSet<>();
-
+ * collects responses to view preparation messages
+ */
+ ViewReplyProcessor prepareProcessor = new ViewReplyProcessor(true);
/**
- * a new view being installed
+ * a collection used to detect unit testing
*/
- private volatile GMSMembershipView<ID> preparedView;
-
+ Set<String> unitTesting = new HashSet<>();
/**
- * the last view that conflicted with view preparation
+ * the view where quorum was most recently lost
*/
- private GMSMembershipView<ID> lastConflictingView;
-
- private List<HostAndPort> locators;
-
+ GMSMembershipView<ID> quorumLostView;
/**
- * a list of join/leave/crashes
+ * the view ID where I entered into membership
*/
- private final List<AbstractGMSMessage<ID>> viewRequests = new LinkedList<>();
-
+ private int birthViewId;
/**
- * the established request collection jitter. This can be overridden for testing with
- * delayViewCreationForTest
+ * my address
*/
- long requestCollectionInterval = MEMBER_REQUEST_COLLECTION_INTERVAL;
-
+ private ID localAddress;
+ private Services<ID> services;
/**
- * collects the response to a join request
+ * have I connected to the distributed system?
*/
- private final JoinResponseMessage<ID>[] joinResponse = new JoinResponseMessage[1];
-
+ private volatile boolean isJoined;
/**
- * collects responses to new views
+ * the previous view
+ **/
+ private volatile GMSMembershipView<ID> previousView;
+ /**
+ * a new view being installed
*/
- ViewReplyProcessor viewProcessor = new ViewReplyProcessor(false);
-
+ private volatile GMSMembershipView<ID> preparedView;
/**
- * collects responses to view preparation messages
+ * the last view that conflicted with view preparation
*/
- ViewReplyProcessor prepareProcessor = new ViewReplyProcessor(true);
-
+ private GMSMembershipView<ID> lastConflictingView;
+ private List<HostAndPort> locators;
/**
* whether quorum checks can cause a forced-disconnect
*/
private boolean quorumRequired = false;
-
/**
* timeout in receiving view acknowledgement
*/
private long viewAckTimeout;
-
/**
* background thread that creates new membership views
*/
private ViewCreator viewCreator;
-
/**
* am I shutting down?
*/
private volatile boolean isStopping;
-
- /**
- * state of collected artifacts during discovery
- */
- final SearchState<ID> searchState = new SearchState<>();
-
- /**
- * a collection used to detect unit testing
- */
- Set<String> unitTesting = new HashSet<>();
-
/**
* a test hook to make this member unresponsive
*/
private volatile boolean playingDead;
- /**
- * the view where quorum was most recently lost
- */
- GMSMembershipView<ID> quorumLostView;
-
- /**
- * for messaging locator
- */
- private final TcpClient locatorClient;
-
public GMSJoinLeave(final TcpClient locatorClient) {
this.locatorClient = locatorClient;
}
- static class SearchState<ID extends MemberIdentifier> {
- public int joinedMembersContacted;
- Set<ID> alreadyTried = new HashSet<>();
- Set<ID> registrants = new HashSet<>();
- ID possibleCoordinator;
- int viewId = -100;
- int locatorsContacted = 0;
- boolean hasContactedAJoinedLocator;
- GMSMembershipView<ID> view;
- int lastFindCoordinatorInViewId = -1000;
- final Set<FindCoordinatorResponse<ID>> responses = new HashSet<>();
- public int responsesExpected;
-
- void cleanup() {
- alreadyTried.clear();
- possibleCoordinator = null;
- view = null;
- synchronized (responses) {
- responses.clear();
- }
- }
-
- public String toString() {
- StringBuffer sb = new StringBuffer(200);
- sb.append("locatorsContacted=").append(locatorsContacted)
- .append("; findInViewResponses=").append(joinedMembersContacted)
- .append("; alreadyTried=").append(alreadyTried).append("; registrants=")
- .append(registrants).append("; possibleCoordinator=").append(possibleCoordinator)
- .append("; viewId=").append(viewId).append("; hasContactedAJoinedLocator=")
- .append(hasContactedAJoinedLocator).append("; view=").append(view).append("; responses=")
- .append(responses);
- return sb.toString();
- }
- }
-
Object getViewInstallationLock() {
return viewInstallationLock;
}
@@ -1809,7 +1748,8 @@ public class GMSJoinLeave<ID extends MemberIdentifier> implements JoinLeave<ID>
}
@Override
- public void init(Services<ID> s) throws MembershipConfigurationException {
+ public void init(Services<ID> s, ModuleService moduleService)
+ throws MembershipConfigurationException {
this.services = s;
MembershipConfig config = services.getConfig();
@@ -1880,14 +1820,52 @@ public class GMSJoinLeave<ID extends MemberIdentifier> implements JoinLeave<ID>
return prepareProcessor.isWaiting();
}
+ static class SearchState<ID extends MemberIdentifier> {
+ final Set<FindCoordinatorResponse<ID>> responses = new HashSet<>();
+ public int joinedMembersContacted;
+ public int responsesExpected;
+ Set<ID> alreadyTried = new HashSet<>();
+ Set<ID> registrants = new HashSet<>();
+ ID possibleCoordinator;
+ int viewId = -100;
+ int locatorsContacted = 0;
+ boolean hasContactedAJoinedLocator;
+ GMSMembershipView<ID> view;
+ int lastFindCoordinatorInViewId = -1000;
+
+ void cleanup() {
+ alreadyTried.clear();
+ possibleCoordinator = null;
+ view = null;
+ synchronized (responses) {
+ responses.clear();
+ }
+ }
+
+ public String toString() {
+ StringBuffer sb = new StringBuffer(200);
+ sb.append("locatorsContacted=").append(locatorsContacted)
+ .append("; findInViewResponses=").append(joinedMembersContacted)
+ .append("; alreadyTried=").append(alreadyTried).append("; registrants=")
+ .append(registrants).append("; possibleCoordinator=").append(possibleCoordinator)
+ .append("; viewId=").append(viewId).append("; hasContactedAJoinedLocator=")
+ .append(hasContactedAJoinedLocator).append("; view=").append(view).append("; responses=")
+ .append(responses);
+ return sb.toString();
+ }
+ }
+
+ static class ViewAbandonedException extends Exception {
+ }
+
class ViewReplyProcessor {
- volatile int viewId = -1;
final Set<ID> notRepliedYet = new HashSet<>();
+ final boolean isPrepareViewProcessor;
+ final Set<ID> pendingRemovals = new HashSet<>();
+ volatile int viewId = -1;
GMSMembershipView<ID> conflictingView;
ID conflictingViewSender;
volatile boolean waiting;
- final boolean isPrepareViewProcessor;
- final Set<ID> pendingRemovals = new HashSet<>();
ViewReplyProcessor(boolean forPreparation) {
this.isPrepareViewProcessor = forPreparation;
@@ -2075,9 +2053,6 @@ public class GMSJoinLeave<ID extends MemberIdentifier> implements JoinLeave<ID>
volatile boolean testFlagForRemovalRequest = false;
// count of number of views abandoned due to conflicts
volatile int abandonedViews = 0;
- private boolean markViewCreatorForShutdown = false; // see GEODE-870
-
-
/**
* initial view to install. guarded by synch on ViewCreator
*/
@@ -2094,6 +2069,7 @@ public class GMSJoinLeave<ID extends MemberIdentifier> implements JoinLeave<ID>
* initial crashed members. guarded by synch on ViewCreator
*/
Set<ID> initialRemovals;
+ private boolean markViewCreatorForShutdown = false; // see GEODE-870
ViewCreator(String name) {
super(name);
@@ -2807,8 +2783,4 @@ public class GMSJoinLeave<ID extends MemberIdentifier> implements JoinLeave<ID>
}
}
-
-
- static class ViewAbandonedException extends Exception {
- }
}
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index 7cff066..6007ec2 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -49,7 +49,6 @@ import java.util.stream.Collectors;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.logging.log4j.Logger;
-import org.jboss.modules.ModuleClassLoader;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.Event;
@@ -96,6 +95,8 @@ import org.apache.geode.internal.serialization.StaticSerialization;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.internal.serialization.VersionedDataInputStream;
import org.apache.geode.logging.internal.OSProcess;
+import org.apache.geode.services.module.ModuleService;
+import org.apache.geode.services.result.ModuleServiceResult;
import org.apache.geode.util.internal.GeodeGlossary;
@@ -129,29 +130,19 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger<
private static final short JGROUPS_TYPE_JGADDRESS = 2000;
private static final short JGROUPS_PROTOCOL_TRANSPORT = 1000;
- protected String jgStackConfig;
-
- JChannel myChannel;
- ID localAddress;
- JGAddress jgAddress;
- private Services<ID> services;
-
- public JGroupsMessenger() {}
-
- /** handlers that receive certain classes of messages instead of the Manager */
- private final Map<Class<?>, MessageHandler<?>> handlers = new ConcurrentHashMap<>();
-
- private volatile GMSMembershipView<ID> view;
+ static {
+ // register classes that we've added to jgroups that are put on the wire
+ // or need a header ID
+ ClassConfigurator.add(JGROUPS_TYPE_JGADDRESS, JGAddress.class);
+ ClassConfigurator.addProtocol(JGROUPS_PROTOCOL_TRANSPORT, Transport.class);
+ }
protected final GMSPingPonger pingPonger = new GMSPingPonger();
-
protected final AtomicLong pongsReceived = new AtomicLong(0);
-
/** tracks multicast messages that have been scheduled for processing */
protected final Map<ID, MessageTracker> scheduledMcastSeqnos = new HashMap<>();
-
- protected short nackack2HeaderId;
-
+ /** handlers that receive certain classes of messages instead of the Manager */
+ private final Map<Class<?>, MessageHandler<?>> handlers = new ConcurrentHashMap<>();
/**
* A set that contains addresses that we have logged JGroups IOExceptions for in the current
* membership view and possibly initiated suspect processing. This reduces the amount of suspect
@@ -159,14 +150,18 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger<
*/
private final Set<Address> addressesWithIoExceptionsProcessed =
Collections.synchronizedSet(new HashSet<>());
-
- static {
- // register classes that we've added to jgroups that are put on the wire
- // or need a header ID
- ClassConfigurator.add(JGROUPS_TYPE_JGADDRESS, JGAddress.class);
- ClassConfigurator.addProtocol(JGROUPS_PROTOCOL_TRANSPORT, Transport.class);
- }
-
+ protected String jgStackConfig;
+ protected short nackack2HeaderId;
+ /**
+ * The JGroupsReceiver is handed messages by the JGroups Channel. It is responsible
+ * for deserializating and dispatching those messages to the appropriate handler
+ */
+ protected JGroupsReceiver jgroupsReceiver;
+ JChannel myChannel;
+ ID localAddress;
+ JGAddress jgAddress;
+ private Services<ID> services;
+ private volatile GMSMembershipView<ID> view;
private GMSEncrypt<ID> encrypt;
/**
@@ -175,12 +170,10 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger<
* to be delivered to handlers after membership services have been rebuilt.
*/
private Queue<org.jgroups.Message> queuedMessagesFromReconnect;
+ private AtomicInteger requestId = new AtomicInteger((new Random().nextInt()));
+ private HashMap<Integer, ID> requestIdVsRecipients = new HashMap<>();
- /**
- * The JGroupsReceiver is handed messages by the JGroups Channel. It is responsible
- * for deserializating and dispatching those messages to the appropriate handler
- */
- protected JGroupsReceiver jgroupsReceiver;
+ public JGroupsMessenger() {}
public static void setChannelReceiver(JChannel channel, Receiver r) {
try {
@@ -198,7 +191,8 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger<
@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD")
- public void init(Services<ID> s) throws MembershipConfigurationException {
+ public void init(Services<ID> s, ModuleService moduleService)
+ throws MembershipConfigurationException {
this.services = s;
MembershipConfig config = services.getConfig();
@@ -206,58 +200,17 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger<
boolean enableNetworkPartitionDetection = config.isNetworkPartitionDetectionEnabled();
System.setProperty("jgroups.resolve_dns", String.valueOf(!enableNetworkPartitionDetection));
- InputStream is = null;
-
- String r;
- if (config.isMulticastEnabled()) {
- r = JGROUPS_MCAST_CONFIG_FILE_NAME;
- } else {
- r = DEFAULT_JGROUPS_TCP_CONFIG;
- }
- ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
- if (contextClassLoader != null) {
- is = contextClassLoader.getResourceAsStream(r);
- }
-
- if (is == null) {
- is = getClass().getResourceAsStream(r);
- }
- if (is == null) {
- is = ClassLoader.getSystemResourceAsStream(r);
- }
-
- if (is == null) {
- if (this.getClass().getClassLoader() instanceof ModuleClassLoader) {
- ModuleClassLoader classLoader = (ModuleClassLoader) this.getClass().getClassLoader();
- is = classLoader.findResourceAsStream(r, false);
- }
- }
-
- if (is == null) {
- throw new MembershipConfigurationException(
- String.format("Cannot find %s", r));
- }
-
- if (this.getClass().getClassLoader() instanceof ModuleClassLoader) {
- ModuleClassLoader classLoader = (ModuleClassLoader) this.getClass().getClassLoader();
+ String properties;
- }
+ ModuleServiceResult<List<InputStream>> resourceStreamResult = moduleService
+ .findResourceAsStream(config.isMulticastEnabled()
+ ? JGROUPS_MCAST_CONFIG_FILE_NAME
+ : DEFAULT_JGROUPS_TCP_CONFIG);
- String properties;
- try {
- StringBuilder sb = new StringBuilder(3000);
- BufferedReader br;
- br = new BufferedReader(new InputStreamReader(is, "US-ASCII"));
- String input;
- while ((input = br.readLine()) != null) {
- sb.append(input);
- }
- br.close();
- properties = sb.toString();
- } catch (Exception ex) {
- throw new MembershipConfigurationException(
- "An Exception was thrown while reading JGroups config.",
- ex);
+ if (resourceStreamResult.isSuccessful()) {
+ properties = getPropertiesFromInputStream(resourceStreamResult.getMessage().get(0));
+ } else {
+ throw new MembershipConfigurationException(resourceStreamResult.getErrorMessage());
}
if (properties.startsWith("<!--")) {
@@ -334,6 +287,26 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger<
}
}
+ private String getPropertiesFromInputStream(InputStream inputStream)
+ throws MembershipConfigurationException {
+
+ try {
+ StringBuilder sb = new StringBuilder(3000);
+ BufferedReader br;
+ br = new BufferedReader(new InputStreamReader(inputStream, "US-ASCII"));
+ String input;
+ while ((input = br.readLine()) != null) {
+ sb.append(input);
+ }
+ br.close();
+ return sb.toString();
+ } catch (Exception ex) {
+ throw new MembershipConfigurationException(
+ "An Exception was thrown while reading JGroups config.",
+ ex);
+ }
+ }
+
@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD")
@@ -491,7 +464,6 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger<
}
}
-
/**
* If JGroups is unable to send a message it may mean that the network is down. If so we need to
* initiate suspect processing on the recipient.
@@ -1012,7 +984,6 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger<
msg.setTransientFlag(org.jgroups.Message.TransientFlag.DONT_LOOPBACK);
}
-
/**
* deserialize a jgroups payload. If it's a DistributionMessage find the ID of the sender and
* establish it as the message's sender
@@ -1239,7 +1210,6 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger<
return this.services.getJoinLeave().getMemberID(jgId);
}
-
@Override
public void emergencyClose() {
this.view = null;
@@ -1271,6 +1241,89 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger<
return qc;
}
+ @Override
+ public Set<ID> send(Message<ID> msg, GMSMembershipView<ID> alternateView) {
+ if (this.encrypt != null) {
+ this.encrypt.installView(alternateView);
+ }
+ return send(msg, true);
+ }
+
+ @Override
+ public byte[] getPublicKey(ID mbr) {
+ if (encrypt != null) {
+ return encrypt.getPublicKey(mbr);
+ }
+ return null;
+ }
+
+ @Override
+ public void setPublicKey(byte[] publickey, ID mbr) {
+ if (encrypt != null) {
+ logger.debug("Setting PK for member " + mbr);
+ encrypt.setPublicKey(publickey, mbr);
+ }
+ }
+
+ @Override
+ public byte[] getClusterSecretKey() {
+ if (encrypt != null) {
+ return encrypt.getClusterSecretKey();
+ }
+ return null;
+ }
+
+ @Override
+ public void setClusterSecretKey(byte[] clusterSecretKey) {
+ if (encrypt != null) {
+ logger.debug("Setting cluster key");
+ encrypt.setClusterKey(clusterSecretKey);
+ }
+ }
+
+ ID getRequestedMember(int requestId) {
+ return requestIdVsRecipients.remove(requestId);
+ }
+
+ void addRequestId(int requestId, ID mbr) {
+ requestIdVsRecipients.put(requestId, mbr);
+ }
+
+ @Override
+ public int getRequestId() {
+ return requestId.incrementAndGet();
+ }
+
+ @Override
+ public void initClusterKey() {
+ if (encrypt != null) {
+ try {
+ logger.info("Initializing cluster key");
+ encrypt.initClusterSecretKey();
+ } catch (Exception e) {
+ throw new RuntimeException("unable to create cluster key ", e);
+ }
+ }
+ }
+
+ static class MessageTracker {
+ long highestSeqno;
+
+ MessageTracker(long seqno) {
+ highestSeqno = seqno;
+ }
+
+ long get() {
+ return highestSeqno;
+ }
+
+ void record(long seqno) {
+ if (seqno > highestSeqno) {
+ highestSeqno = seqno;
+ }
+ }
+ }
+
/**
* JGroupsReceiver receives incoming JGroups messages and passes them to a handler. It may be
* accessed through JChannel.getReceiver().
@@ -1394,90 +1447,4 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger<
return (MessageHandler<Message<ID>>) h;
}
}
-
- @Override
- public Set<ID> send(Message<ID> msg, GMSMembershipView<ID> alternateView) {
- if (this.encrypt != null) {
- this.encrypt.installView(alternateView);
- }
- return send(msg, true);
- }
-
- @Override
- public byte[] getPublicKey(ID mbr) {
- if (encrypt != null) {
- return encrypt.getPublicKey(mbr);
- }
- return null;
- }
-
- @Override
- public void setPublicKey(byte[] publickey, ID mbr) {
- if (encrypt != null) {
- logger.debug("Setting PK for member " + mbr);
- encrypt.setPublicKey(publickey, mbr);
- }
- }
-
- @Override
- public void setClusterSecretKey(byte[] clusterSecretKey) {
- if (encrypt != null) {
- logger.debug("Setting cluster key");
- encrypt.setClusterKey(clusterSecretKey);
- }
- }
-
- @Override
- public byte[] getClusterSecretKey() {
- if (encrypt != null) {
- return encrypt.getClusterSecretKey();
- }
- return null;
- }
-
- private AtomicInteger requestId = new AtomicInteger((new Random().nextInt()));
- private HashMap<Integer, ID> requestIdVsRecipients = new HashMap<>();
-
- ID getRequestedMember(int requestId) {
- return requestIdVsRecipients.remove(requestId);
- }
-
- void addRequestId(int requestId, ID mbr) {
- requestIdVsRecipients.put(requestId, mbr);
- }
-
- @Override
- public int getRequestId() {
- return requestId.incrementAndGet();
- }
-
- @Override
- public void initClusterKey() {
- if (encrypt != null) {
- try {
- logger.info("Initializing cluster key");
- encrypt.initClusterSecretKey();
- } catch (Exception e) {
- throw new RuntimeException("unable to create cluster key ", e);
- }
- }
- }
-
- static class MessageTracker {
- long highestSeqno;
-
- MessageTracker(long seqno) {
- highestSeqno = seqno;
- }
-
- long get() {
- return highestSeqno;
- }
-
- void record(long seqno) {
- if (seqno > highestSeqno) {
- highestSeqno = seqno;
- }
- }
- }
}
diff --git a/geode-modules/src/main/java/org/apache/geode/services/module/impl/JBossModuleServiceImpl.java b/geode-modules/src/main/java/org/apache/geode/services/module/impl/JBossModuleServiceImpl.java
index 505103f..32260bd 100644
--- a/geode-modules/src/main/java/org/apache/geode/services/module/impl/JBossModuleServiceImpl.java
+++ b/geode-modules/src/main/java/org/apache/geode/services/module/impl/JBossModuleServiceImpl.java
@@ -15,6 +15,7 @@
package org.apache.geode.services.module.impl;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -236,4 +237,21 @@ public class JBossModuleServiceImpl implements ModuleService {
return Success.of(result);
}
+
+ @Override
+ public ModuleServiceResult<List<InputStream>> findResourceAsStream(String resourceFile) {
+ List<InputStream> results = new ArrayList<>();
+ modules.values().forEach(module -> {
+ InputStream resourceAsStream =
+ module.getClassLoader().findResourceAsStream(resourceFile, false);
+
+ if (resourceAsStream != null) {
+ results.add(resourceAsStream);
+ }
+ });
+
+ return results.isEmpty()
+ ? Failure.of(String.format("No resource for path: %s could be found", resourceFile))
+ : Success.of(results);
+ }
}