You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bu...@apache.org on 2020/01/30 06:08:34 UTC
[geode] branch develop updated: GEODE-7652: MembershipBuilder lets
you set the local locator (#4614)
This is an automated email from the ASF dual-hosted git repository.
burcham pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new d294764 GEODE-7652: MembershipBuilder lets you set the local locator (#4614)
d294764 is described below
commit d2947640af8c6bf16a650348bf09a28d11d27997
Author: Bill Burcham <bb...@pivotal.io>
AuthorDate: Wed Jan 29 22:08:07 2020 -0800
GEODE-7652: MembershipBuilder lets you set the local locator (#4614)
---
...ReconnectWithClusterConfigurationDUnitTest.java | 1 -
.../apache/geode/distributed/LocatorDUnitTest.java | 13 +---
.../internal/membership/MembershipJUnitTest.java | 48 +++++++------
.../membership/gms/MembershipOnlyTest.java | 17 ++---
.../locator/GMSLocatorRecoveryIntegrationTest.java | 5 +-
.../internal/ClusterDistributionManager.java | 18 +++--
.../distributed/internal/DistributionImpl.java | 26 +++-----
.../internal/InternalDistributedSystem.java | 78 ++++++++++++++++++----
.../distributed/internal/InternalLocator.java | 17 +++--
.../gms/locator/GMSLocatorIntegrationTest.java | 8 +--
.../internal/membership/api/LifecycleListener.java | 7 --
.../internal/membership/api/MembershipBuilder.java | 5 +-
.../internal/membership/gms/GMSMembership.java | 1 -
.../membership/gms/LifecycleListenerNoOp.java | 5 --
.../membership/gms/MembershipBuilderImpl.java | 16 ++++-
.../internal/membership/gms/Services.java | 25 +++++--
.../membership/gms/locator/GMSLocator.java | 19 +++---
.../gms/locator/MembershipLocatorImpl.java | 17 ++++-
18 files changed, 201 insertions(+), 125 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectWithClusterConfigurationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectWithClusterConfigurationDUnitTest.java
index b51d7f2..6a7c262 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectWithClusterConfigurationDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectWithClusterConfigurationDUnitTest.java
@@ -91,7 +91,6 @@ public class ReconnectWithClusterConfigurationDUnitTest implements Serializable
locator = Locator.startLocatorAndDS(locatorPorts[locatorNumber], new File(""), props);
system = locator.getDistributedSystem();
cache = ((InternalLocator) locator).getCache();
- ReconnectDUnitTest.savedSystem = locator.getDistributedSystem();
IgnoredException.addIgnoredException(
"org.apache.geode.ForcedDisconnectException||Possible loss of quorum");
} catch (IOException e) {
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/LocatorDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/LocatorDUnitTest.java
index 2e6bf6b..348fdad 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/distributed/LocatorDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/LocatorDUnitTest.java
@@ -1357,17 +1357,8 @@ public class LocatorDUnitTest implements Serializable {
public void testHostingMultipleLocators() throws Exception {
Locator.startLocator(port1, null);
- try {
- Locator.startLocator(port2, null);
- fail("expected second locator start to fail.");
- } catch (IllegalStateException expected) {
- }
-
- String locators = hostName + "[" + port1 + "]," + hostName + "[" + port2 + "]";
-
- Properties props = getBasicProperties(locators);
-
- getConnectedDistributedSystem(props);
+ assertThatThrownBy(() -> Locator.startLocator(port2, null))
+ .isInstanceOf(IllegalStateException.class);
}
/**
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/MembershipJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/MembershipJUnitTest.java
index d70e603..3d21f69 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/MembershipJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/MembershipJUnitTest.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -46,12 +45,10 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.geode.distributed.ConfigurationProperties;
-import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionConfigImpl;
-import org.apache.geode.distributed.internal.DistributionImpl;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.distributed.internal.SerialAckedMessage;
@@ -66,6 +63,7 @@ import org.apache.geode.distributed.internal.membership.api.Membership;
import org.apache.geode.distributed.internal.membership.api.MembershipBuilder;
import org.apache.geode.distributed.internal.membership.api.MembershipConfig;
import org.apache.geode.distributed.internal.membership.api.MembershipListener;
+import org.apache.geode.distributed.internal.membership.api.MembershipLocator;
import org.apache.geode.distributed.internal.membership.api.MembershipView;
import org.apache.geode.distributed.internal.membership.api.MessageListener;
import org.apache.geode.distributed.internal.tcpserver.TcpClient;
@@ -131,7 +129,7 @@ public class MembershipJUnitTest {
throws Exception {
Membership<InternalDistributedMember> m1 = null, m2 = null;
- Locator l = null;
+ InternalLocator internalLocator = null;
// int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort();
try {
@@ -142,8 +140,9 @@ public class MembershipJUnitTest {
// this locator will hook itself up with the first Membership
// to be created
- l = InternalLocator.startLocator(port, new File(""), null, null, localHost, false,
- new Properties(), null, temporaryFolder.getRoot().toPath());
+ internalLocator =
+ InternalLocator.startLocator(port, new File(""), null, null, localHost, false,
+ new Properties(), null, temporaryFolder.getRoot().toPath());
// create configuration objects
Properties nonDefault = new Properties();
@@ -159,11 +158,14 @@ public class MembershipJUnitTest {
new RemoteTransportConfig(config, ClusterDistributionManager.LOCATOR_DM_TYPE);
// start the first membership manager
- m1 = createMembershipManager(config, transport).getLeft();
+ final MembershipLocator<InternalDistributedMember> membershipLocator =
+ internalLocator.getMembershipLocator();
+
+ m1 = createMembershipManager(config, transport, membershipLocator).getLeft();
// start the second membership manager
final Pair<Membership, MessageListener> pair =
- createMembershipManager(config, transport);
+ createMembershipManager(config, transport, membershipLocator);
m2 = pair.getLeft();
final MessageListener listener2 = pair.getRight();
@@ -241,15 +243,16 @@ public class MembershipJUnitTest {
if (m1 != null) {
m1.shutdown();
}
- if (l != null) {
- l.stop();
+ if (internalLocator != null) {
+ internalLocator.stop();
}
}
}
private Pair<Membership, MessageListener> createMembershipManager(
final DistributionConfigImpl config,
- final RemoteTransportConfig transport) throws MemberStartupException {
+ final RemoteTransportConfig transport,
+ final MembershipLocator<InternalDistributedMember> locator) throws MemberStartupException {
final MembershipListener<InternalDistributedMember> listener = mock(MembershipListener.class);
final MessageListener<InternalDistributedMember> messageListener = mock(MessageListener.class);
final DMStats stats1 = mock(DMStats.class);
@@ -290,6 +293,7 @@ public class MembershipJUnitTest {
final Membership<InternalDistributedMember> m1 =
MembershipBuilder.<InternalDistributedMember>newMembershipBuilder(
socketCreator, locatorClient, serializer, memberIdentifierFactory)
+ .setMembershipLocator(locator)
.setAuthenticator(authenticator)
.setStatistics(stats1)
.setMessageListener(messageListener)
@@ -297,10 +301,6 @@ public class MembershipJUnitTest {
.setConfig(new ServiceConfig(transport, config))
.setLifecycleListener(lifeCycleListener)
.create();
- doAnswer(invocation -> {
- DistributionImpl.connectLocatorToServices(m1);
- return null;
- }).when(lifeCycleListener).started();
m1.start();
m1.startEventProcessing();
return Pair.of(m1, messageListener);
@@ -319,7 +319,7 @@ public class MembershipJUnitTest {
public void testLocatorAndTwoServersJoinUsingDiffeHellman() throws Exception {
Membership<InternalDistributedMember> m1 = null, m2 = null;
- Locator l = null;
+ InternalLocator internalLocator = null;
int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort();
try {
@@ -331,8 +331,9 @@ public class MembershipJUnitTest {
p.setProperty(ConfigurationProperties.SECURITY_UDP_DHALGO, "AES:128");
// this locator will hook itself up with the first Membership
// to be created
- l = InternalLocator.startLocator(port, new File(""), null, null, localHost, false, p, null,
- temporaryFolder.getRoot().toPath());
+ internalLocator =
+ InternalLocator.startLocator(port, new File(""), null, null, localHost, false, p, null,
+ temporaryFolder.getRoot().toPath());
// create configuration objects
Properties nonDefault = new Properties();
@@ -349,11 +350,14 @@ public class MembershipJUnitTest {
new RemoteTransportConfig(config, ClusterDistributionManager.LOCATOR_DM_TYPE);
// start the first membership manager
- m1 = createMembershipManager(config, transport).getLeft();
+ final MembershipLocator<InternalDistributedMember> membershipLocator =
+ internalLocator.getMembershipLocator();
+
+ m1 = createMembershipManager(config, transport, membershipLocator).getLeft();
// start the second membership manager
final Pair<Membership, MessageListener> pair =
- createMembershipManager(config, transport);
+ createMembershipManager(config, transport, membershipLocator);
m2 = pair.getLeft();
final MessageListener listener2 = pair.getRight();
@@ -423,8 +427,8 @@ public class MembershipJUnitTest {
if (m1 != null) {
m1.disconnect(false);
}
- if (l != null) {
- l.stop();
+ if (internalLocator != null) {
+ internalLocator.stop();
}
}
}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipOnlyTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipOnlyTest.java
index c1ba5bd..9718e0b 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipOnlyTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipOnlyTest.java
@@ -16,7 +16,6 @@ package org.apache.geode.distributed.internal.membership.gms;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import java.io.IOException;
@@ -96,7 +95,6 @@ public class MembershipOnlyTest {
assertThat(membership.getView().getMembers()).hasSize(1);
}
-
@Test
public void twoMembersCanConnect() throws MemberStartupException {
Membership<MemberIdentifierImpl> member1 = startMember("member1", membershipLocator);
@@ -137,20 +135,15 @@ public class MembershipOnlyTest {
final Membership<MemberIdentifierImpl> membership =
MembershipBuilder.<MemberIdentifierImpl>newMembershipBuilder(
- socketCreator, locatorClient, dsfidSerializer, memberIdFactory)
+ socketCreator,
+ locatorClient,
+ dsfidSerializer,
+ memberIdFactory)
+ .setMembershipLocator(membershipLocator)
.setConfig(config)
.setLifecycleListener(lifeCycleListener)
.create();
- // TODO - the membership *must* be installed in the locator at this special
- // point during membership startup for the start to succeed
- if (embeddedLocator != null) {
- doAnswer(invocation -> {
- embeddedLocator.setMembership(membership);
- return null;
- }).when(lifeCycleListener).started();
- }
-
membership.start();
membership.startEventProcessing();
return membership;
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorRecoveryIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorRecoveryIntegrationTest.java
index dcc49d4..94a7e7e 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorRecoveryIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorRecoveryIntegrationTest.java
@@ -41,7 +41,6 @@ import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.apache.geode.DataSerializer;
-import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.Distribution;
@@ -78,7 +77,7 @@ public class GMSLocatorRecoveryIntegrationTest {
private File stateFile;
private GMSLocator gmsLocator;
- private Locator locator;
+ private InternalLocator locator;
private DSFIDSerializer serializer;
private Distribution distribution;
@@ -193,7 +192,7 @@ public class GMSLocatorRecoveryIntegrationTest {
distribution =
new DistributionImpl(mockClusterDistributionManager, transport, mockSystem, mockListener,
- mockMessageListener);
+ mockMessageListener, locator.getMembershipLocator());
distribution.start();
GMSLocator gmsLocator = new GMSLocator(localHost,
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
index 4d765ac..0ece626 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
@@ -67,6 +67,7 @@ import org.apache.geode.distributed.internal.membership.api.MemberDisconnectedEx
import org.apache.geode.distributed.internal.membership.api.MemberIdentifier;
import org.apache.geode.distributed.internal.membership.api.MemberIdentifierFactory;
import org.apache.geode.distributed.internal.membership.api.Membership;
+import org.apache.geode.distributed.internal.membership.api.MembershipLocator;
import org.apache.geode.distributed.internal.membership.api.MembershipView;
import org.apache.geode.distributed.internal.membership.api.Message;
import org.apache.geode.internal.Assert;
@@ -295,7 +296,8 @@ public class ClusterDistributionManager implements DistributionManager {
*
* @param system The distributed system to which this distribution manager will send messages.
*/
- static ClusterDistributionManager create(InternalDistributedSystem system) {
+ static ClusterDistributionManager create(InternalDistributedSystem system,
+ final MembershipLocator<InternalDistributedMember> membershipLocator) {
ClusterDistributionManager distributionManager = null;
boolean beforeJoined = true;
@@ -322,7 +324,8 @@ public class ClusterDistributionManager implements DistributionManager {
long start = System.currentTimeMillis();
distributionManager =
- new ClusterDistributionManager(system, transport, system.getAlertingService());
+ new ClusterDistributionManager(system, transport, system.getAlertingService(),
+ membershipLocator);
distributionManager.assertDistributionManagerType();
beforeJoined = false; // we have now joined the system
@@ -426,7 +429,8 @@ public class ClusterDistributionManager implements DistributionManager {
*
*/
private ClusterDistributionManager(RemoteTransportConfig transport,
- InternalDistributedSystem system, AlertingService alertingService) {
+ InternalDistributedSystem system, AlertingService alertingService,
+ MembershipLocator<InternalDistributedMember> locator) {
this.system = system;
this.transport = transport;
@@ -459,7 +463,7 @@ public class ClusterDistributionManager implements DistributionManager {
DMListener listener = new DMListener(this);
distribution = DistributionImpl
.createDistribution(this, transport, system, listener,
- this::handleIncomingDMsg);
+ this::handleIncomingDMsg, locator);
sb.append(System.currentTimeMillis() - start);
@@ -488,8 +492,10 @@ public class ClusterDistributionManager implements DistributionManager {
* @param system The distributed system to which this distribution manager will send messages.
*/
private ClusterDistributionManager(InternalDistributedSystem system,
- RemoteTransportConfig transport, AlertingService alertingService) {
- this(transport, system, alertingService);
+ RemoteTransportConfig transport,
+ AlertingService alertingService,
+ final MembershipLocator<InternalDistributedMember> membershipLocator) {
+ this(transport, system, alertingService, membershipLocator);
boolean finishedConstructor = false;
try {
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
index 482fe9d..b81d2ac 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
@@ -57,6 +57,7 @@ import org.apache.geode.distributed.internal.membership.api.MembershipBuilder;
import org.apache.geode.distributed.internal.membership.api.MembershipClosedException;
import org.apache.geode.distributed.internal.membership.api.MembershipConfigurationException;
import org.apache.geode.distributed.internal.membership.api.MembershipListener;
+import org.apache.geode.distributed.internal.membership.api.MembershipLocator;
import org.apache.geode.distributed.internal.membership.api.MembershipStatistics;
import org.apache.geode.distributed.internal.membership.api.MembershipView;
import org.apache.geode.distributed.internal.membership.api.Message;
@@ -109,9 +110,11 @@ public class DistributionImpl implements Distribution {
public DistributionImpl(final ClusterDistributionManager clusterDistributionManager,
- final RemoteTransportConfig transport, final InternalDistributedSystem system,
+ final RemoteTransportConfig transport,
+ final InternalDistributedSystem system,
final MembershipListener<InternalDistributedMember> listener,
- final MessageListener<InternalDistributedMember> messageListener) {
+ final MessageListener<InternalDistributedMember> messageListener,
+ final MembershipLocator<InternalDistributedMember> locator) {
this.clusterDistributionManager = clusterDistributionManager;
this.transportConfig = transport;
this.tcpDisabled = transportConfig.isTcpDisabled();
@@ -137,6 +140,7 @@ public class DistributionImpl implements Distribution {
locatorClient,
InternalDataSerializer.getDSFIDSerializer(),
new ClusterDistributionManager.ClusterDistributionManagerIDFactory())
+ .setMembershipLocator(locator)
.setAuthenticator(
new GMSAuthenticator(system.getSecurityProperties(), system.getSecurityService(),
system.getSecurityLogWriter(), system.getInternalLogWriter()))
@@ -156,14 +160,6 @@ public class DistributionImpl implements Distribution {
}
}
- public static void connectLocatorToServices(Membership<InternalDistributedMember> membership) {
- // see if a locator was started and put it in GMS Services
- InternalLocator l = (InternalLocator) Locator.getLocator();
- if (l != null && l.getMembershipLocator() != null) {
- l.getMembershipLocator().setMembership(membership);
- }
- }
-
@Override
public Membership<InternalDistributedMember> getMembership() {
return membership;
@@ -217,11 +213,12 @@ public class DistributionImpl implements Distribution {
ClusterDistributionManager clusterDistributionManager, RemoteTransportConfig transport,
InternalDistributedSystem system,
MembershipListener<InternalDistributedMember> listener,
- MessageListener<InternalDistributedMember> messageListener) {
+ MessageListener<InternalDistributedMember> messageListener,
+ final MembershipLocator<InternalDistributedMember> locator) {
DistributionImpl distribution =
new DistributionImpl(clusterDistributionManager, transport, system, listener,
- messageListener);
+ messageListener, locator);
distribution.start();
return distribution;
}
@@ -916,11 +913,6 @@ public class DistributionImpl implements Distribution {
}
@Override
- public void started() {
- connectLocatorToServices(distribution.getMembership());
- }
-
- @Override
public void forcedDisconnect() {
// stop server locators immediately since they may not have correct
// information. This has caused client failures in bridge/wan
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
index fa53678..2ed25ae 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
@@ -77,6 +77,7 @@ import org.apache.geode.distributed.DurableClientAttributes;
import org.apache.geode.distributed.internal.locks.GrantorRequestProcessor;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.api.MembershipInformation;
+import org.apache.geode.distributed.internal.membership.api.MembershipLocator;
import org.apache.geode.distributed.internal.membership.api.QuorumChecker;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.InternalDataSerializer;
@@ -198,6 +199,23 @@ public class InternalDistributedSystem extends DistributedSystem
private final AtomicReference<ClusterAlertMessaging> clusterAlertMessaging =
new AtomicReference<>();
+ // captured in initialize() when starting so that we can hand it to new instance when restarting
+ private MembershipLocator<InternalDistributedMember> membershipLocator;
+
+ /**
+ * If the experimental multiple-system feature is enabled, always create a new system.
+ *
+ * <p>
+ * Otherwise, create a new InternalDistributedSystem with the given properties, or connect to an
+ * existing one with the same properties.
+ */
+ public static InternalDistributedSystem connectInternal(
+ Properties config,
+ SecurityConfig securityConfig,
+ MetricsService.Builder metricsSessionBuilder) {
+ return connectInternal(config, securityConfig, metricsSessionBuilder, null);
+ }
+
/**
* If the experimental multiple-system feature is enabled, always create a new system.
*
@@ -205,8 +223,11 @@ public class InternalDistributedSystem extends DistributedSystem
* Otherwise, create a new InternalDistributedSystem with the given properties, or connect to an
* existing one with the same properties.
*/
- public static InternalDistributedSystem connectInternal(Properties config,
- SecurityConfig securityConfig, MetricsService.Builder metricsSessionBuilder) {
+ public static InternalDistributedSystem connectInternal(
+ Properties config,
+ SecurityConfig securityConfig,
+ MetricsService.Builder metricsSessionBuilder,
+ final MembershipLocator<InternalDistributedMember> locator) {
if (config == null) {
config = new Properties();
}
@@ -214,6 +235,7 @@ public class InternalDistributedSystem extends DistributedSystem
if (Boolean.getBoolean(ALLOW_MULTIPLE_SYSTEMS_PROPERTY)) {
return new Builder(config, metricsSessionBuilder)
.setSecurityConfig(securityConfig)
+ .setLocator(locator)
.build();
}
@@ -264,6 +286,7 @@ public class InternalDistributedSystem extends DistributedSystem
// Make a new connection to the distributed system
InternalDistributedSystem newSystem = new Builder(config, metricsSessionBuilder)
.setSecurityConfig(securityConfig)
+ .setLocator(locator)
.build();
addSystem(newSystem);
return newSystem;
@@ -651,7 +674,9 @@ public class InternalDistributedSystem extends DistributedSystem
* Initializes this connection to a distributed system with the current configuration state.
*/
private void initialize(SecurityManager securityManager, PostProcessor postProcessor,
- MetricsService.Builder metricsServiceBuilder) {
+ MetricsService.Builder metricsServiceBuilder,
+ final MembershipLocator<InternalDistributedMember> membershipLocatorArg) {
+
if (originalConfig.getLocators().equals("")) {
if (originalConfig.getMcastPort() != 0) {
throw new GemFireConfigException("The " + LOCATORS + " attribute can not be empty when the "
@@ -740,7 +765,7 @@ public class InternalDistributedSystem extends DistributedSystem
}
try {
- startInitLocator();
+ startInitLocator(membershipLocatorArg);
} catch (InterruptedException e) {
throw new SystemConnectException("Startup has been interrupted", e);
}
@@ -751,12 +776,13 @@ public class InternalDistributedSystem extends DistributedSystem
if (!isLoner) {
try {
- dm = ClusterDistributionManager.create(this);
+ dm = ClusterDistributionManager.create(this, membershipLocator);
// fix bug #46324
if (InternalLocator.hasLocator()) {
- InternalLocator locator = InternalLocator.getLocator();
+ InternalLocator internalLocator = InternalLocator.getLocator();
getDistributionManager().addHostedLocators(getDistributedMember(),
- InternalLocator.getLocatorStrings(), locator.isSharedConfigurationEnabled());
+ InternalLocator.getLocatorStrings(),
+ internalLocator.isSharedConfigurationEnabled());
}
} finally {
if (dm == null && quorumChecker != null) {
@@ -780,7 +806,7 @@ public class InternalDistributedSystem extends DistributedSystem
}
if (attemptingToReconnect && (startedLocator == null)) {
try {
- startInitLocator();
+ startInitLocator(membershipLocatorArg);
} catch (InterruptedException e) {
throw new SystemConnectException("Startup has been interrupted", e);
}
@@ -853,11 +879,25 @@ public class InternalDistributedSystem extends DistributedSystem
}
/**
+ * Starts a locator in this JVM iff the distribution config wants one started.
+ *
+ * @return the membershipLocatorArg if the distribution config has no locator specified;
+ * otherwise starts a new InternalLocator and returns its associated MembershipLocator
+ *
* @since GemFire 5.7
+ * @param membershipLocatorArg on initial startup, a MembershipLocator provided explicitly by
+ * a caller, or null; on restart, the old MembershipLocator (from the previous instance of
+ * InternalDistributedSystem.)
*/
- private void startInitLocator() throws InterruptedException {
- String locatorString = originalConfig.getStartLocator();
- if (locatorString.length() == 0) {
+ private void startInitLocator(
+ final MembershipLocator<InternalDistributedMember> membershipLocatorArg)
+ throws InterruptedException {
+
+ final String locatorString = originalConfig.getStartLocator();
+ final boolean shouldStartLocator = locatorString.length() > 0;
+
+ if (!shouldStartLocator) {
+ membershipLocator = membershipLocatorArg;
return;
}
@@ -886,10 +926,13 @@ public class InternalDistributedSystem extends DistributedSystem
boolean startedPeerLocation = false;
try {
startedLocator.startPeerLocation();
+ membershipLocator = startedLocator.getMembershipLocator();
startedPeerLocation = true;
} finally {
if (!startedPeerLocation) {
startedLocator.stop();
+ startedLocator = null;
+ membershipLocator = null;
}
}
} catch (IOException e) {
@@ -2568,7 +2611,8 @@ public class InternalDistributedSystem extends DistributedSystem
try {
- newDS = connectInternal(configProps, null, metricsService.getRebuilder());
+ newDS = connectInternal(configProps, null, metricsService.getRebuilder(),
+ membershipLocator);
} catch (CancelException e) {
if (isReconnectCancelled()) {
@@ -2961,6 +3005,8 @@ public class InternalDistributedSystem extends DistributedSystem
private SecurityConfig securityConfig;
private MetricsService.Builder metricsServiceBuilder;
+ private MembershipLocator<InternalDistributedMember> locator;
+
public Builder(Properties configProperties, MetricsService.Builder metricsServiceBuilder) {
this.configProperties = configProperties;
this.metricsServiceBuilder = metricsServiceBuilder;
@@ -2971,6 +3017,12 @@ public class InternalDistributedSystem extends DistributedSystem
return this;
}
+ public Builder setLocator(
+ final MembershipLocator<InternalDistributedMember> locator) {
+ this.locator = locator;
+ return this;
+ }
+
/**
* Builds and initializes new instance of InternalDistributedSystem.
*/
@@ -2989,7 +3041,7 @@ public class InternalDistributedSystem extends DistributedSystem
FunctionStatsManager::new);
newSystem
.initialize(securityConfig.getSecurityManager(), securityConfig.getPostProcessor(),
- metricsServiceBuilder);
+ metricsServiceBuilder, locator);
notifyConnectListeners(newSystem);
stopThreads = false;
return newSystem;
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
index 8466dfb..b7475ad 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
@@ -110,6 +110,7 @@ import org.apache.geode.management.internal.configuration.handlers.SharedConfigu
import org.apache.geode.management.internal.configuration.messages.ClusterManagementServiceInfoRequest;
import org.apache.geode.management.internal.configuration.messages.SharedConfigurationStatusRequest;
import org.apache.geode.management.internal.configuration.messages.SharedConfigurationStatusResponse;
+import org.apache.geode.metrics.internal.InternalDistributedSystemMetricsService;
import org.apache.geode.security.AuthTokenEnabledComponents;
/**
@@ -561,9 +562,9 @@ public class InternalLocator extends Locator implements ConnectListener, LogConf
membershipLocator.addHandler(InfoRequest.class, new InfoRequestHandler());
restartHandlers.add((ds, cache, sharedConfig) -> {
- InternalDistributedSystem ids = (InternalDistributedSystem) ds;
- Distribution distribution = ids.getDM().getDistribution();
- membershipLocator.setMembership(distribution.getMembership());
+ final InternalDistributedSystem ids = (InternalDistributedSystem) ds;
+ // let old locator know about new membership object
+ membershipLocator.setMembership(ids.getDM().getDistribution().getMembership());
});
}
@@ -658,7 +659,7 @@ public class InternalLocator extends Locator implements ConnectListener, LogConf
/**
* @return the TcpHandler for peer to peer discovery
*/
- public MembershipLocator getMembershipLocator() {
+ public MembershipLocator<InternalDistributedMember> getMembershipLocator() {
return membershipLocator;
}
@@ -733,13 +734,17 @@ public class InternalLocator extends Locator implements ConnectListener, LogConf
logger.info("Starting distributed system");
internalDistributedSystem =
- (InternalDistributedSystem) DistributedSystem.connect(distributedSystemProperties);
+ InternalDistributedSystem
+ .connectInternal(distributedSystemProperties, null,
+ new InternalDistributedSystemMetricsService.Builder(),
+ membershipLocator);
if (peerLocator) {
// We've created a peer location message handler - it needs to be connected to
// the membership service in order to get membership view notifications
membershipLocator
- .setMembership(internalDistributedSystem.getDM().getDistribution().getMembership());
+ .setMembership(internalDistributedSystem.getDM()
+ .getDistribution().getMembership());
}
internalDistributedSystem.addDisconnectListener(sys -> stop(false, false, false));
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorIntegrationTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorIntegrationTest.java
index cab82dc..7086f34 100644
--- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorIntegrationTest.java
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorIntegrationTest.java
@@ -30,7 +30,7 @@ import org.apache.geode.distributed.internal.membership.api.MemberDataBuilder;
import org.apache.geode.distributed.internal.membership.api.MemberIdentifier;
import org.apache.geode.distributed.internal.membership.api.MemberIdentifierFactoryImpl;
import org.apache.geode.distributed.internal.membership.api.MembershipConfigurationException;
-import org.apache.geode.distributed.internal.membership.gms.GMSMembership;
+import org.apache.geode.distributed.internal.membership.api.MembershipLocator;
import org.apache.geode.distributed.internal.membership.gms.GMSMembershipView;
import org.apache.geode.distributed.internal.membership.gms.MembershipLocatorStatisticsNoOp;
import org.apache.geode.distributed.internal.membership.gms.Services;
@@ -85,9 +85,9 @@ public class GMSLocatorIntegrationTest {
services.getSerializer().getObjectDeserializer()),
services.getSerializer().getObjectSerializer(),
services.getSerializer().getObjectDeserializer());
- GMSMembership membership = mock(GMSMembership.class);
- when(membership.getServices()).thenReturn(services);
- gmsLocator.setMembership(membership);
+
+ final MembershipLocator membershipLocator = mock(MembershipLocator.class);
+ gmsLocator.setServices(services);
}
@Test
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/LifecycleListener.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/LifecycleListener.java
index 32f5c49..68118ff 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/LifecycleListener.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/LifecycleListener.java
@@ -33,13 +33,6 @@ public interface LifecycleListener<ID extends MemberIdentifier> {
final ID memberID);
/**
- * Invoked when the Membership is starting. All membership services will have been
- * initialized and had their "started" methods invoked but we will not yet have joined
- * the cluster.
- */
- void started();
-
- /**
* Invoked when the Membership has successfully joined the cluster. At this point the
* membership address is stable.
*/
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/MembershipBuilder.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/MembershipBuilder.java
index d3f491a..220a0d0 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/MembershipBuilder.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/MembershipBuilder.java
@@ -32,6 +32,8 @@ public interface MembershipBuilder<ID extends MemberIdentifier> {
MembershipBuilder<ID> setMembershipListener(MembershipListener<ID> membershipListener);
+ MembershipBuilder<ID> setMembershipLocator(MembershipLocator<ID> membershipLocator);
+
MembershipBuilder<ID> setMessageListener(MessageListener<ID> messageListener);
MembershipBuilder<ID> setConfig(MembershipConfig membershipConfig);
@@ -45,6 +47,7 @@ public interface MembershipBuilder<ID extends MemberIdentifier> {
final TcpClient locatorClient,
final DSFIDSerializer serializer,
final MemberIdentifierFactory<ID> memberFactory) {
- return new MembershipBuilderImpl<>(socketCreator, locatorClient, serializer, memberFactory);
+ return new MembershipBuilderImpl<>(
+ socketCreator, locatorClient, serializer, memberFactory);
}
}
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java
index ccef4ef..4406939 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java
@@ -1890,7 +1890,6 @@ public class GMSMembership<ID extends MemberIdentifier> implements Membership<ID
@Override
public void started() throws MemberStartupException {
startCleanupTimer();
- lifecycleListener.started();
}
/* Service interface */
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/LifecycleListenerNoOp.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/LifecycleListenerNoOp.java
index 3924467..5f3f8f0 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/LifecycleListenerNoOp.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/LifecycleListenerNoOp.java
@@ -43,11 +43,6 @@ public class LifecycleListenerNoOp<ID extends MemberIdentifier> implements Lifec
}
@Override
- public void started() {
-
- }
-
- @Override
public void forcedDisconnect() {
}
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/MembershipBuilderImpl.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/MembershipBuilderImpl.java
index f6a46db..742c046 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/MembershipBuilderImpl.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/MembershipBuilderImpl.java
@@ -24,8 +24,10 @@ import org.apache.geode.distributed.internal.membership.api.MembershipBuilder;
import org.apache.geode.distributed.internal.membership.api.MembershipConfig;
import org.apache.geode.distributed.internal.membership.api.MembershipConfigurationException;
import org.apache.geode.distributed.internal.membership.api.MembershipListener;
+import org.apache.geode.distributed.internal.membership.api.MembershipLocator;
import org.apache.geode.distributed.internal.membership.api.MembershipStatistics;
import org.apache.geode.distributed.internal.membership.api.MessageListener;
+import org.apache.geode.distributed.internal.membership.gms.locator.MembershipLocatorImpl;
import org.apache.geode.distributed.internal.tcpserver.TcpClient;
import org.apache.geode.distributed.internal.tcpserver.TcpSocketCreator;
import org.apache.geode.internal.serialization.DSFIDSerializer;
@@ -47,6 +49,8 @@ public class MembershipBuilderImpl<ID extends MemberIdentifier> implements Membe
private final MemberIdentifierFactory<ID> memberFactory;
private LifecycleListener<ID> lifecycleListener = new LifecycleListenerNoOp();
+ private MembershipLocatorImpl<ID> membershipLocator;
+
public MembershipBuilderImpl(
final TcpSocketCreator socketCreator,
final TcpClient locatorClient,
@@ -77,6 +81,13 @@ public class MembershipBuilderImpl<ID extends MemberIdentifier> implements Membe
}
@Override
+ public MembershipBuilder<ID> setMembershipLocator(
+ final MembershipLocator<ID> membershipLocator) {
+ this.membershipLocator = (MembershipLocatorImpl<ID>) membershipLocator;
+ return this;
+ }
+
+ @Override
public MembershipBuilder<ID> setMessageListener(MessageListener<ID> messageListener) {
this.messageListener = messageListener;
return this;
@@ -99,9 +110,12 @@ public class MembershipBuilderImpl<ID extends MemberIdentifier> implements Membe
public Membership<ID> create() throws MembershipConfigurationException {
GMSMembership<ID> gmsMembership =
new GMSMembership<>(membershipListener, messageListener, lifecycleListener);
- Services<ID> services =
+ final Services<ID> services =
new Services<>(gmsMembership.getGMSManager(), statistics, authenticator,
membershipConfig, serializer, memberFactory, locatorClient, socketCreator);
+ if (membershipLocator != null) {
+ services.setLocators(membershipLocator.getGMSLocator(), membershipLocator);
+ }
services.init();
return gmsMembership;
}
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java
index a994e42..334dc9c 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java
@@ -46,6 +46,7 @@ import org.apache.geode.distributed.internal.membership.api.MemberStartupExcepti
import org.apache.geode.distributed.internal.membership.api.MembershipClosedException;
import org.apache.geode.distributed.internal.membership.api.MembershipConfig;
import org.apache.geode.distributed.internal.membership.api.MembershipConfigurationException;
+import org.apache.geode.distributed.internal.membership.api.MembershipLocator;
import org.apache.geode.distributed.internal.membership.api.MembershipStatistics;
import org.apache.geode.distributed.internal.membership.gms.fd.GMSHealthMonitor;
import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
@@ -57,6 +58,7 @@ import org.apache.geode.distributed.internal.membership.gms.locator.FindCoordina
import org.apache.geode.distributed.internal.membership.gms.locator.FindCoordinatorResponse;
import org.apache.geode.distributed.internal.membership.gms.locator.GetViewRequest;
import org.apache.geode.distributed.internal.membership.gms.locator.GetViewResponse;
+import org.apache.geode.distributed.internal.membership.gms.locator.MembershipLocatorImpl;
import org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave;
import org.apache.geode.distributed.internal.membership.gms.messages.FinalCheckPassedMessage;
import org.apache.geode.distributed.internal.membership.gms.messages.HeartbeatMessage;
@@ -103,6 +105,7 @@ public class Services<ID extends MemberIdentifier> {
private volatile Exception shutdownCause;
private Locator<ID> locator;
+ private MembershipLocator<ID> membershipLocator;
private final Timer timer = new Timer("Geode Membership Timer", true);
@@ -209,6 +212,18 @@ public class Services<ID extends MemberIdentifier> {
this.joinLeave.started();
this.healthMon.started();
this.manager.started();
+
+ if (membershipLocator != null) {
+ /*
+ * Now that all the services have started we can let the membership locator know
+ * about them. We must do this before telling the manager to joinDistributedSystem()
+ * later in this method
+ */
+ final MembershipLocatorImpl locatorImpl =
+ (MembershipLocatorImpl) this.membershipLocator;
+ locatorImpl.setServices(this);
+ }
+
logger.debug("All membership services have been started");
started = true;
} catch (RuntimeException e) {
@@ -330,12 +345,14 @@ public class Services<ID extends MemberIdentifier> {
return this.manager;
}
- public Locator<ID> getLocator() {
- return this.locator;
+ public void setLocators(final Locator<ID> locator,
+ final MembershipLocator<ID> membershipLocator) {
+ this.locator = locator;
+ this.membershipLocator = membershipLocator;
}
- public void setLocator(Locator<ID> locator) {
- this.locator = locator;
+ public Locator<ID> getLocator() {
+ return locator;
}
public JoinLeave<ID> getJoinLeave() {
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java
index f295362..815b28e 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java
@@ -39,10 +39,8 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.distributed.internal.membership.api.MemberIdentifier;
-import org.apache.geode.distributed.internal.membership.api.Membership;
import org.apache.geode.distributed.internal.membership.api.MembershipConfigurationException;
import org.apache.geode.distributed.internal.membership.api.MembershipLocatorStatistics;
-import org.apache.geode.distributed.internal.membership.gms.GMSMembership;
import org.apache.geode.distributed.internal.membership.gms.GMSMembershipView;
import org.apache.geode.distributed.internal.membership.gms.GMSUtil;
import org.apache.geode.distributed.internal.membership.gms.Services;
@@ -132,15 +130,20 @@ public class GMSLocator<ID extends MemberIdentifier> implements Locator<ID>, Tcp
this.objectDeserializer = objectDeserializer;
}
- public synchronized boolean setMembership(Membership<ID> membership) {
- if (services == null || services.isStopped()) {
- services = ((GMSMembership<ID>) membership).getServices();
- localAddress = services.getMessenger().getMemberID();
+ /**
+ * Called initially and after each auto-reconnect. See restart handlers in InternalLocator
+ * up in geode-core. Services must be started before this call.
+ *
+ */
+ public synchronized boolean setServices(
+ final Services<ID> services) {
+ if (this.services == null || this.services.isStopped()) {
+ this.services = services;
+ localAddress = this.services.getMessenger().getMemberID();
Objects.requireNonNull(localAddress, "member address should have been established");
logger.info("Peer locator is connecting to local membership services with ID {}",
localAddress);
- services.setLocator(this);
- GMSMembershipView<ID> newView = services.getJoinLeave().getView();
+ GMSMembershipView<ID> newView = this.services.getJoinLeave().getView();
if (newView != null) {
view = newView;
recoveredView = null;
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/MembershipLocatorImpl.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/MembershipLocatorImpl.java
index 7a0a2d3..9009d66 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/MembershipLocatorImpl.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/MembershipLocatorImpl.java
@@ -34,6 +34,8 @@ import org.apache.geode.distributed.internal.membership.api.MembershipConfig;
import org.apache.geode.distributed.internal.membership.api.MembershipConfigurationException;
import org.apache.geode.distributed.internal.membership.api.MembershipLocator;
import org.apache.geode.distributed.internal.membership.api.MembershipLocatorStatistics;
+import org.apache.geode.distributed.internal.membership.gms.GMSMembership;
+import org.apache.geode.distributed.internal.membership.gms.Services;
import org.apache.geode.distributed.internal.tcpserver.ProtocolChecker;
import org.apache.geode.distributed.internal.tcpserver.TcpClient;
import org.apache.geode.distributed.internal.tcpserver.TcpHandler;
@@ -141,8 +143,9 @@ public class MembershipLocatorImpl<ID extends MemberIdentifier> implements Membe
}
@Override
- public void setMembership(Membership<ID> membership) {
- gmsLocator.setMembership(membership);
+ public void setMembership(final Membership<ID> membership) {
+ final GMSMembership<ID> gmsMembership = (GMSMembership<ID>) membership;
+ setServices(gmsMembership.getServices());
}
@Override
@@ -156,10 +159,18 @@ public class MembershipLocatorImpl<ID extends MemberIdentifier> implements Membe
}
@VisibleForTesting
- public GMSLocator getGMSLocator() {
+ public GMSLocator<ID> getGMSLocator() {
return this.gmsLocator;
}
+ /**
+ * Services is a class internal to the membership module. As such, the ability to setServices
+ * is available ony within the module. It's not part of the external API.
+ */
+ public void setServices(final Services<ID> services) {
+ gmsLocator.setServices(services);
+ }
+
public void stop() {
if (isAlive()) {
logger.info("Stopping {}", this);