You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/08/11 23:51:24 UTC
[4/4] incubator-geode git commit: enabling multicast communications
and fixing a locator auto-restart problem. This disables the use of multicast
for discovery and all tests that were doing that have been changed to use
locators.
enabling multicast communications and fixing a locator auto-restart problem.
This disables the use of multicast for discovery and all tests that were
doing that have been changed to use locators.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/3c560cb9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/3c560cb9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/3c560cb9
Branch: refs/heads/feature/GEODE-77
Commit: 3c560cb937f604ec6413a79fb0b24597efa0eeb7
Parents: 14d3786
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Tue Aug 11 14:49:10 2015 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Tue Aug 11 14:50:33 2015 -0700
----------------------------------------------------------------------
.../gemfire/admin/AdminDistributedSystem.java | 5 -
.../internal/AdminDistributedSystemImpl.java | 15 +-
.../ManagedEntityConfigXmlGenerator.java | 21 +-
.../gemfire/distributed/DistributedSystem.java | 17 +-
.../internal/DistributionConfig.java | 2 +-
.../internal/DistributionConfigImpl.java | 16 +-
.../internal/HighPriorityAckedMessage.java | 4 +-
.../internal/InternalDistributedSystem.java | 92 ++----
.../distributed/internal/InternalLocator.java | 19 +-
.../distributed/internal/StartupMessage.java | 11 -
.../distributed/internal/StartupOperation.java | 1 -
.../membership/InternalDistributedMember.java | 4 +-
.../internal/membership/MemberAttributes.java | 13 -
.../internal/membership/gms/GMSUtil.java | 1 -
.../internal/membership/gms/Services.java | 2 +-
.../membership/gms/interfaces/Messenger.java | 2 +-
.../gms/locator/FindCoordinatorRequest.java | 2 +
.../gms/locator/FindCoordinatorResponse.java | 13 +-
.../membership/gms/locator/GMSLocator.java | 6 +-
.../membership/gms/membership/GMSJoinLeave.java | 252 ++++++++--------
.../gms/messages/JoinRequestMessage.java | 7 +-
.../gms/messages/JoinResponseMessage.java | 11 +
.../gms/messenger/JGroupsMessenger.java | 106 +++++--
.../gms/mgr/GMSMembershipManager.java | 37 ++-
.../gemfire/internal/AvailablePort.java | 73 ++---
.../admin/remote/RemoteGfManagerAgent.java | 37 +--
.../admin/remote/RemoteTransportConfig.java | 101 ++-----
.../cache/DistributedCacheOperation.java | 2 +-
.../internal/i18n/ParentLocalizedStrings.java | 2 +-
.../gemfire/internal/redis/RegionCache.java | 2 +-
.../CreateAlterDestroyRegionCommands.java | 13 +-
.../cli/functions/RegionCreateFunction.java | 5 +
.../cli/functions/RegionFunctionArgs.java | 33 +-
.../internal/cli/i18n/CliStrings.java | 12 +-
.../distributed/internal/jgroups-config.xml | 98 +++---
.../distributed/internal/jgroups-mcast.xml | 128 ++++----
.../AutoConnectionSourceWithUDPDUnitTest.java | 140 ---------
.../management/MemoryThresholdsDUnitTest.java | 95 ++----
.../MemoryThresholdsOffHeapDUnitTest.java | 68 ++---
.../query/dunit/QueryUsingPoolDUnitTest.java | 30 +-
.../cache/query/dunit/RemoteQueryDUnitTest.java | 27 +-
...esourceManagerWithQueryMonitorDUnitTest.java | 37 +--
.../functional/IndexCreationJUnitTest.java | 2 -
.../index/CopyOnReadIndexDUnitTest.java | 22 +-
.../PutAllWithIndexPerfDUnitDisabledTest.java | 3 +-
.../gemstone/gemfire/cache30/CacheTestCase.java | 47 ---
.../distributed/DistributedSystemDUnitTest.java | 21 +-
.../InternalDistributedSystemJUnitTest.java | 6 +-
.../internal/ProductUseLogDUnitTest.java | 2 +-
.../gemfire/internal/AvailablePortHelper.java | 2 +-
...wardCompatibilitySerializationDUnitTest.java | 298 +++++++++++++++++++
...wardCompatibilitySerializationJUnitTest.java | 296 ------------------
.../gemstone/gemfire/internal/FDDUnitTest.java | 18 +-
.../internal/cache/Bug41957DUnitTest.java | 8 +-
.../cache/ClientServerGetAllDUnitTest.java | 38 +--
.../cache/ClientServerTransactionDUnitTest.java | 2 +-
.../cache/execute/Bug51193DUnitTest.java | 8 +-
.../internal/cache/ha/Bug48571DUnitTest.java | 4 +-
.../internal/cache/ha/Bug48879DUnitTest.java | 7 +-
.../cache/partitioned/Bug43684DUnitTest.java | 23 +-
.../cache/partitioned/Bug51400DUnitTest.java | 6 +-
.../cache/tier/sockets/Bug36829DUnitTest.java | 5 +-
.../cache/tier/sockets/Bug37805DUnitTest.java | 3 +-
.../cache/tier/sockets/CacheServerTestUtil.java | 10 +-
.../DurableClientQueueSizeDUnitTest.java | 20 +-
.../sockets/DurableClientStatsDUnitTest.java | 12 +-
.../sockets/DurableRegistrationDUnitTest.java | 34 +--
.../sockets/InterestListFailoverDUnitTest.java | 5 +-
.../sockets/UnregisterInterestDUnitTest.java | 3 +-
.../management/LocatorManagementDUnitTest.java | 4 -
.../gemfire/test/golden/GoldenTestCase.java | 6 +-
.../test/java/dunit/DistributedTestCase.java | 28 +-
.../java/dunit/standalone/DUnitLauncher.java | 10 +
.../java/dunit/standalone/ProcessManager.java | 4 +
74 files changed, 1106 insertions(+), 1413 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/admin/AdminDistributedSystem.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/admin/AdminDistributedSystem.java b/gemfire-core/src/main/java/com/gemstone/gemfire/admin/AdminDistributedSystem.java
index be067f9..4b71bd2 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/admin/AdminDistributedSystem.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/admin/AdminDistributedSystem.java
@@ -121,11 +121,6 @@ public interface AdminDistributedSystem {
public String getLocators();
/**
- * Returns true if this system is using multicast instead of locators for discovery
- */
- public boolean isMcastDiscovery();
-
- /**
* Returns true if this system has enabled the use of multicast for communications
*/
public boolean isMcastEnabled();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/admin/internal/AdminDistributedSystemImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/admin/internal/AdminDistributedSystemImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/admin/internal/AdminDistributedSystemImpl.java
index 356d115..049b121 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/admin/internal/AdminDistributedSystemImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/admin/internal/AdminDistributedSystemImpl.java
@@ -452,11 +452,6 @@ implements com.gemstone.gemfire.admin.AdminDistributedSystem,
return false;
}
- /** Returns true if this system is using multicast instead of locators */
- public boolean isMcastDiscovery() {
- return this.isMcastEnabled() && (this.getLocators().length() == 0);
- }
-
/** Returns true if this system can use multicast for communications */
public boolean isMcastEnabled() {
return this.getMcastPort() > 0 ;
@@ -1336,11 +1331,9 @@ implements com.gemstone.gemfire.admin.AdminDistributedSystem,
this.getMcastPort()).append("]").toString();
locatorIds.add(new DistributionLocatorId(mcastId));
}
- if (!isMcastDiscovery()) {
- StringTokenizer st = new StringTokenizer(this.getLocators(), ",");
- while (st.hasMoreTokens()) {
- locatorIds.add(new DistributionLocatorId(st.nextToken()));
- }
+ StringTokenizer st = new StringTokenizer(this.getLocators(), ",");
+ while (st.hasMoreTokens()) {
+ locatorIds.add(new DistributionLocatorId(st.nextToken()));
}
if (logger.isDebugEnabled()) {
@@ -1739,7 +1732,7 @@ implements com.gemstone.gemfire.admin.AdminDistributedSystem,
// LOG: saves LogWriterLogger from AdminDistributedSystemImpl for RemoteGfManagerAgentConfig
private GfManagerAgentConfig buildAgentConfig(InternalLogWriter logWriter) {
RemoteTransportConfig conf = new RemoteTransportConfig(
- isMcastEnabled(), isMcastDiscovery(), getDisableTcp(),
+ isMcastEnabled(), getDisableTcp(),
getDisableAutoReconnect(),
getBindAddress(), buildSSLConfig(), parseLocators(),
getMembershipPortRange(), getTcpPort());
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/admin/internal/ManagedEntityConfigXmlGenerator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/admin/internal/ManagedEntityConfigXmlGenerator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/admin/internal/ManagedEntityConfigXmlGenerator.java
index 31eaf2f..f1a9dfa 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/admin/internal/ManagedEntityConfigXmlGenerator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/admin/internal/ManagedEntityConfigXmlGenerator.java
@@ -145,31 +145,14 @@ public class ManagedEntityConfigXmlGenerator
* Generates XML for locators in the distributed system
*/
private void generateDiscovery() throws SAXException {
- if (!this.system.isMcastDiscovery()) {
- handler.startElement("", LOCATORS, LOCATORS, EMPTY);
+ handler.startElement("", LOCATORS, LOCATORS, EMPTY);
- generateLocators();
- }
+ generateLocators();
handler.endElement("", LOCATORS, LOCATORS);
}
/**
- * Generates XML for multicast discovery
- */
- private void generateMulticast() throws SAXException {
- int port = this.system.getMcastPort();
- String address = this.system.getMcastAddress();
-
- AttributesImpl atts = new AttributesImpl();
- atts.addAttribute("", "", PORT, "", String.valueOf(port));
- atts.addAttribute("", "", ADDRESS, "", address);
-
- handler.startElement("", MULTICAST, MULTICAST, atts);
- handler.endElement("", MULTICAST, MULTICAST);
- }
-
- /**
* Generates XML for the distributed system's locators
*/
private void generateLocators() throws SAXException {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/DistributedSystem.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/DistributedSystem.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/DistributedSystem.java
index b7b2cd8..3f8040e 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/DistributedSystem.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/DistributedSystem.java
@@ -55,10 +55,10 @@ import com.gemstone.gemfire.security.GemFireSecurityException;
*
* When a program connects to the distributed system, a "distribution
* manager" is started in this VM and the other members of the
- * distributed system are located. This discovery can be performed
- * using either IP multicast (default) or by contacting "locators"
- * running on a given host and port. All connections that are
- * configured to use the same multicast address/port and the same
+ * distributed system are located. This discovery is performed
+ * by contacting "locators"
+ * running on a given host and port. All DistributedSystems that are
+ * configured to use the same same
* locators are part of the same distributed system.
*
* <P>
@@ -133,13 +133,10 @@ import com.gemstone.gemfire.security.GemFireSecurityException;
* <dl>
* <a name="mcast-port"><dt>mcast-port</dt></a>
* <dd><U>Description</U>: The port used for multicast networking.
- * If zero, then multicast will be disabled and locators must be used to find the other members
- * of the distributed system.
- * If "mcast-port" is zero and "locators" is ""
- * then this distributed system will be isolated from all other GemFire
- * processes.
+ * If zero, then multicast will be disabled and unicast messaging will
+ * be used.
* </dd>
- * <dd><U>Default</U>: "0" if locators is not ""; otherwise "10334"</dd>
+ * <dd><U>Default</U>: "0"</dd>
* </dl>
*
* <dl>
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionConfig.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionConfig.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionConfig.java
index 10094a9..4916fe3 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionConfig.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionConfig.java
@@ -101,7 +101,7 @@ public interface DistributionConfig extends Config, LogConfig {
public static final String MCAST_PORT_NAME = "mcast-port";
/** The default value of the "mcastPort" property */
- public static final int DEFAULT_MCAST_PORT = 10334;
+ public static final int DEFAULT_MCAST_PORT = 0;
/**
* The minimum mcastPort.
* <p> Actual value of this constant is <code>0</code>.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionConfigImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionConfigImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionConfigImpl.java
index b8dfeb3..a7d4232 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionConfigImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionConfigImpl.java
@@ -1137,14 +1137,16 @@ public class DistributionConfigImpl
private void computeMcastPortDefault() {
- ConfigSource cs = getAttSourceMap().get(MCAST_PORT_NAME);
- if (cs == null) {
- String locators = getLocators();
- if (locators != null && !locators.isEmpty()) {
- this.mcastPort = 0; // fixes 46308
- }
- }
+ // a no-op since multicast discovery has been removed
+ // and the default mcast port is now zero
+// ConfigSource cs = getAttSourceMap().get(MCAST_PORT_NAME);
+// if (cs == null) {
+// String locators = getLocators();
+// if (locators != null && !locators.isEmpty()) {
+// this.mcastPort = 0; // fixes 46308
+// }
+// }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/HighPriorityAckedMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/HighPriorityAckedMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/HighPriorityAckedMessage.java
index 0592433..edf700c 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/HighPriorityAckedMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/HighPriorityAckedMessage.java
@@ -57,7 +57,9 @@ public final class HighPriorityAckedMessage extends HighPriorityDistributionMess
public HighPriorityAckedMessage() {
super();
InternalDistributedSystem ds = InternalDistributedSystem.getAnyInstance();
- this.originDm = (DistributionManager)ds.getDistributionManager();
+ if (ds != null) {
+ this.originDm = (DistributionManager)ds.getDistributionManager();
+ }
this.id = this.originDm.getDistributionManagerId();
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
index 52ee2d0..d03f558 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
@@ -507,14 +507,17 @@ public final class InternalDistributedSystem
* current configuration state.
*/
private void initialize() {
- if (this.originalConfig.getMcastPort() == 0 && this.originalConfig.getLocators().equals("")) {
- // no distribution
- this.isLoner = true;
-// throw new IllegalArgumentException("The "
-// + DistributionConfig.LOCATORS_NAME
-// + " attribute can not be empty when the "
-// + DistributionConfig.MCAST_PORT_NAME
-// + " attribute is zero.");
+ if (this.originalConfig.getLocators().equals("")) {
+ if (this.originalConfig.getMcastPort() != 0) {
+ throw new GemFireConfigException("The "
+ + DistributionConfig.LOCATORS_NAME
+ + " attribute can not be empty when the "
+ + DistributionConfig.MCAST_PORT_NAME
+ + " attribute is non-zero.");
+ } else {
+ // no distribution
+ this.isLoner = true;
+ }
}
if (this.isLoner) {
@@ -1457,26 +1460,19 @@ public final class InternalDistributedSystem
// @todo Do we need to compare SSL properties?
- if (me.getMcastPort() != 0) {
- // mcast
- return me.getMcastPort() == other.getMcastPort() &&
- me.getMcastAddress().equals(other.getMcastAddress());
+ // locators
+ String myLocators = me.getLocators();
+ String otherLocators = other.getLocators();
- } else {
- // locators
- String myLocators = me.getLocators();
- String otherLocators = other.getLocators();
-
- // quick check
- if (myLocators.equals(otherLocators)) {
- return true;
+ // quick check
+ if (myLocators.equals(otherLocators)) {
+ return true;
- } else {
- myLocators = canonicalizeLocators(myLocators);
- otherLocators = canonicalizeLocators(otherLocators);
+ } else {
+ myLocators = canonicalizeLocators(myLocators);
+ otherLocators = canonicalizeLocators(otherLocators);
- return myLocators.equals(otherLocators);
- }
+ return myLocators.equals(otherLocators);
}
}
@@ -1492,19 +1488,17 @@ public final class InternalDistributedSystem
String l = st.nextToken();
StringBuffer canonical = new StringBuffer();
DistributionLocatorId locId = new DistributionLocatorId(l);
- if (!locId.isMcastId()) {
- String addr = locId.getBindAddress();
- if (addr != null && addr.trim().length() > 0) {
- canonical.append(addr);
- }
- else {
- canonical.append(locId.getHost().getHostAddress());
- }
- canonical.append("[");
- canonical.append(String.valueOf(locId.getPort()));
- canonical.append("]");
- sorted.add(canonical.toString());
+ String addr = locId.getBindAddress();
+ if (addr != null && addr.trim().length() > 0) {
+ canonical.append(addr);
+ }
+ else {
+ canonical.append(locId.getHost().getHostAddress());
}
+ canonical.append("[");
+ canonical.append(String.valueOf(locId.getPort()));
+ canonical.append("]");
+ sorted.add(canonical.toString());
}
StringBuffer sb = new StringBuffer();
@@ -2578,12 +2572,6 @@ public final class InternalDistributedSystem
// logger.info("reconnecting IDS@"+System.identityHashCode(this));
- boolean mcastDiscovery = oldConfig.getLocators().isEmpty()
- && oldConfig.getStartLocator().isEmpty()
- && oldConfig.getMcastPort() != 0;
- boolean mcastQuorumContacted = false;
-
-
if (Thread.currentThread().getName().equals("CloserThread")) {
if (isDebugEnabled) {
logger.debug("changing thread name to ReconnectThread"); // wha?! really?
@@ -2686,24 +2674,6 @@ public final class InternalDistributedSystem
System.setProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE, "true");
}
// log.fine("DistributedSystem@"+System.identityHashCode(this)+" reconnecting distributed system. attempt #"+reconnectAttemptCounter);
- if (mcastDiscovery && (quorumChecker != null) && !mcastQuorumContacted) {
- mcastQuorumContacted = quorumChecker.checkForQuorum(3*this.config.getMemberTimeout());
- if (!mcastQuorumContacted) {
- if (logger.isDebugEnabled()) {
- logger.debug("quorum check failed - skipping reconnect attempt");
- }
- continue;
- }
- if (logger.isDebugEnabled()) {
- logger.debug(LocalizedMessage.create(LocalizedStrings.InternalDistributedSystem_QUORUM_OF_MEMBERS_CONTACTED));
- }
- mcastQuorumContacted = true;
- // bug #51527: become more aggressive about reconnecting since there are other
- // members around now
- if (timeOut > 5000) {
- timeOut = 5000;
- }
- }
configProps.put(DistributionConfig.DS_RECONNECTING_NAME, Boolean.TRUE);
if (quorumChecker != null) {
configProps.put(DistributionConfig.DS_QUORUM_CHECKER_NAME, quorumChecker);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalLocator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalLocator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalLocator.java
index 5525378..f649713 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalLocator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalLocator.java
@@ -179,7 +179,7 @@ public class InternalLocator extends Locator implements ConnectListener {
private volatile boolean isSharedConfigurationStarted = false;
- private Thread restartThread;
+ private volatile Thread restartThread;
public boolean isSharedConfigurationEnabled() {
@@ -1039,7 +1039,23 @@ public class InternalLocator extends Locator implements ConnectListener {
restarted = false;
this.server.join();
if (this.stoppedForReconnect) {
+ logger.info("waiting for distributed system to disconnect...");
+ while (this.myDs.isConnected()) {
+ Thread.sleep(5000);
+ }
+ logger.info("waiting for distributed system to reconnect...");
restarted = this.myDs.waitUntilReconnected(-1, TimeUnit.SECONDS);
+ if (restarted) {
+ logger.info("system restarted");
+ } else {
+ logger.info("system was not restarted");
+ }
+ Thread rs = this.restartThread;
+ if (rs != null) {
+ logger.info("waiting for services to restart...");
+ rs.join();
+ this.restartThread = null;
+ }
}
} while (restarted);
}
@@ -1124,6 +1140,7 @@ public class InternalLocator extends Locator implements ConnectListener {
restarted = true;
}
}
+ logger.info("restart thread exiting. Service was "+(restarted? "" : "not ") + "restarted");
return restarted;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java
index 169c3c3..93e3a5c 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java
@@ -41,7 +41,6 @@ public final class StartupMessage extends HighPriorityDistributionMessage implem
private Stub directChannel;
private String version = GemFireVersion.getGemFireVersion(); // added for bug 29005
private int replyProcessorId;
- private boolean isMcastDiscovery;
private boolean isMcastEnabled;
private boolean isTcpDisabled;
private Set interfaces;
@@ -143,14 +142,6 @@ public final class StartupMessage extends HighPriorityDistributionMessage implem
// }
/**
- * Sets the mcastDiscovery flag for this message
- * @since 5.0
- */
- void setMcastDiscovery(boolean flag) {
- isMcastDiscovery = flag;
- }
-
- /**
* Sets the tcpDisabled flag for this message
* @since 5.0
*/
@@ -333,7 +324,6 @@ public final class StartupMessage extends HighPriorityDistributionMessage implem
DataSerializer.writeString(this.version, out);
out.writeInt(this.replyProcessorId);
out.writeBoolean(this.isMcastEnabled);
- out.writeBoolean(this.isMcastDiscovery);
out.writeBoolean(this.isTcpDisabled);
// Send a description of all of the DataSerializers and
@@ -408,7 +398,6 @@ public final class StartupMessage extends HighPriorityDistributionMessage implem
this.version = DataSerializer.readString(in);
this.replyProcessorId = in.readInt();
this.isMcastEnabled = in.readBoolean();
- this.isMcastDiscovery = in.readBoolean();
this.isTcpDisabled = in.readBoolean();
int serializerCount = in.readInt();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupOperation.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupOperation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupOperation.java
index d1070b8..57eae3d 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupOperation.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupOperation.java
@@ -58,7 +58,6 @@ public class StartupOperation {
msg.setEnforceUniqueZone(enforceUniqueZone);
msg.setDirectChannel(dm.getDirectChannel());
msg.setMcastEnabled(transport.isMcastEnabled());
- msg.setMcastDiscovery(transport.isMcastDiscovery());
msg.setMcastPort(dm.getSystem().getOriginalConfig().getMcastPort());
msg.setMcastHostAddress(dm.getSystem().getOriginalConfig().getMcastAddress());
msg.setTcpDisabled(transport.isTcpDisabled());
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
index 470fe3d..1fefcb9 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
@@ -971,11 +971,11 @@ public final class InternalDistributedMember
public void fromData(DataInput in)
throws IOException, ClassNotFoundException {
- fromDataPre_9_0_0_0(in);
+ fromDataPre_GFE_9_0_0_0(in);
netMbr.readAdditionalData(in);
}
- public void fromDataPre_9_0_0_0(DataInput in)
+ public void fromDataPre_GFE_9_0_0_0(DataInput in)
throws IOException, ClassNotFoundException {
InetAddress inetAddr = DataSerializer.readInetAddress(in);
int port = in.readInt();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MemberAttributes.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MemberAttributes.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MemberAttributes.java
index 558847e..54fd306 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MemberAttributes.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MemberAttributes.java
@@ -224,19 +224,6 @@ public class MemberAttributes implements DataSerializable {
}
/**
- * Set the VmPid to be the given value. This may be done by JGroups UDP
- * protocol if there is no PID available to augment its membership port number.
- * This functionality was added by us for bug #41983
- * @param uniqueID
- */
- public static void setDefaultVmPid(int uniqueID) {
- // note: JGroupMembershipManager establishes DEFAULT before attempting to
- // create a JGroups channel, so we know it isn't INVALID here
- setDefaults(DEFAULT.dcPort, uniqueID, DEFAULT.vmKind, DEFAULT.vmViewId, DEFAULT.name,
- DEFAULT.groups, DEFAULT.durableClientAttributes);
- }
-
- /**
* @return the membership view number in which this member was born
*/
public int getVmViewId() {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java
index c65f4d0..abd9f12 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java
@@ -22,7 +22,6 @@ public class GMSUtil {
try {
if (bindAddress == null || bindAddress.trim().length() == 0) {
addr = SocketCreator.getLocalHost();
- logger.info("Peer-to-peer bind address was null - checking for locator communications using " + addr);
} else {
addr = InetAddress.getByName(bindAddress);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
index 7478ef3..79830b9 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
@@ -138,7 +138,7 @@ public class Services {
this.joinLeave.started();
this.healthMon.started();
this.manager.started();
-
+ logger.info("Membership: all services have been started");
this.manager.joinDistributedSystem();
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
index 2c7e4e1..d01dcd8 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
@@ -16,7 +16,7 @@ public interface Messenger extends Service {
* sends an asynchronous message. Returns destinations that did not
* receive the message due to no longer being in the view
*/
- Set<InternalDistributedMember> send(DistributionMessage m) throws IOException;
+ Set<InternalDistributedMember> send(DistributionMessage m);
/**
* returns the endpoint ID for this member
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java
index 42d2006..0a7370f 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java
@@ -5,6 +5,8 @@ import java.io.DataOutput;
import java.io.IOException;
import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.DataSerializableFixedID;
import com.gemstone.gemfire.internal.Version;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
index 7ac25db..ea443e6 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
@@ -5,17 +5,22 @@ import java.io.DataOutput;
import java.io.IOException;
import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.DataSerializableFixedID;
import com.gemstone.gemfire.internal.Version;
-public class FindCoordinatorResponse implements DataSerializableFixedID {
+public class FindCoordinatorResponse implements DataSerializableFixedID {
+
private InternalDistributedMember coordinator;
+ private boolean fromView;
private boolean networkPartitionDetectionEnabled;
private boolean usePreferredCoordinators;
public FindCoordinatorResponse(InternalDistributedMember coordinator,
+ boolean fromView,
boolean networkPartitionDectionEnabled, boolean usePreferredCoordinators) {
this.coordinator = coordinator;
this.networkPartitionDetectionEnabled = networkPartitionDectionEnabled;
@@ -38,6 +43,10 @@ public class FindCoordinatorResponse implements DataSerializableFixedID {
return coordinator;
}
+ public boolean isFromView() {
+ return fromView;
+ }
+
@Override
public String toString() {
return "FindCoordinatorResponse(coordinator="+coordinator+")";
@@ -58,6 +67,7 @@ public class FindCoordinatorResponse implements DataSerializableFixedID {
@Override
public void toData(DataOutput out) throws IOException {
DataSerializer.writeObject(coordinator, out);
+ out.writeBoolean(fromView);
out.writeBoolean(networkPartitionDetectionEnabled);
out.writeBoolean(usePreferredCoordinators);
}
@@ -65,6 +75,7 @@ public class FindCoordinatorResponse implements DataSerializableFixedID {
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
coordinator = DataSerializer.readObject(in);
+ fromView = in.readBoolean();
networkPartitionDetectionEnabled = in.readBoolean();
usePreferredCoordinators = in.readBoolean();
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
index 0eb29a1..a988dec 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
@@ -135,8 +135,12 @@ public class GMSLocator implements Locator, NetLocator {
if (view == null) {
findServices();
}
+
+ boolean fromView = false;
+
if (view != null) {
coord = view.getCoordinator();
+ fromView = true;
}
if (coord != null) {
@@ -161,7 +165,7 @@ public class GMSLocator implements Locator, NetLocator {
}
}
}
- response = new FindCoordinatorResponse(coord,
+ response = new FindCoordinatorResponse(coord, fromView,
this.networkPartitionDetectionEnabled, this.usePreferredCoordinators);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index e3c2ba2..cb4f9c9 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -1,5 +1,13 @@
package com.gemstone.gemfire.distributed.internal.membership.gms.membership;
+import static com.gemstone.gemfire.distributed.internal.DistributionManager.LOCATOR_DM_TYPE;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.INSTALL_VIEW_MESSAGE;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_REQUEST;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_RESPONSE;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.LEAVE_REQUEST_MESSAGE;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.REMOVE_MEMBER_MESSAGE;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.VIEW_ACK_MESSAGE;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -30,8 +38,8 @@ import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.NetView;
import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember;
-import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
import com.gemstone.gemfire.distributed.internal.membership.gms.GMSUtil;
+import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinLeave;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler;
import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorRequest;
@@ -43,7 +51,6 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.messages.LeaveRe
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.RemoveMemberMessage;
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.ViewAckMessage;
import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient;
-import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.security.AuthenticationFailedException;
@@ -60,7 +67,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
private static final int JOIN_ATTEMPTS = Integer.getInteger("gemfire.join-attempts", 4);
/** amount of time to sleep before trying to join after a failed attempt */
- private static final int JOIN_RETRY_SLEEP = Integer.getInteger("gemfire.join-retry-sleep", 3000);
+ private static final int JOIN_RETRY_SLEEP = Integer.getInteger("gemfire.join-retry-sleep", 1000);
/** amount of time to wait for a view to be acked by all members before performing suspect processing on non-responders */
private static final int VIEW_INSTALLATION_TIMEOUT = Integer.getInteger("gemfire.view-ack-timeout", 12500);
@@ -83,7 +90,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
private Services services;
- private boolean isConnected;
+ /** have I connected to the distributed system? */
+ private boolean isJoined;
/** a lock governing GMS state */
private ReadWriteLock stateLock = new ReentrantReadWriteLock();
@@ -91,6 +99,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
/** guarded by stateLock */
private boolean isCoordinator;
+ /** a synch object that guards view installation */
private final Object viewInstallationLock = new Object();
/** the currently installed view */
@@ -110,17 +119,22 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
/** collects the response to a join request */
private JoinResponseMessage[] joinResponse = new JoinResponseMessage[1];
- private ViewReplyProcessor viewResponses = new ViewReplyProcessor(false);
+ /** collects responses to new views */
+ private ViewReplyProcessor viewProcessor = new ViewReplyProcessor(false);
- private ViewReplyProcessor prepareResponses = new ViewReplyProcessor(true);
+ /** collects responses to view preparation messages */
+ private ViewReplyProcessor prepareProcessor = new ViewReplyProcessor(true);
+ /** whether quorum checks can cause a forced-disconnect */
private boolean quorumRequired = false;
+ /** timeout in receiving view acknowledgement */
private int viewAckTimeout;
/** background thread that creates new membership views */
private ViewCreator viewCreator;
+ /** am I shutting down? */
private volatile boolean isStopping;
@@ -137,6 +151,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
* @return true if successful, false if not
*/
public boolean join() {
+
+ if (this.localAddress.getVmKind() == LOCATOR_DM_TYPE
+ && Boolean.getBoolean("gemfire.first-member")) {
+ becomeCoordinator();
+ return true;
+ }
+
for (int tries=0; tries<JOIN_ATTEMPTS; tries++) {
InternalDistributedMember coord = findCoordinator();
logger.debug("found possible coordinator {}", coord);
@@ -158,7 +179,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
return false;
}
} // for
- return this.isConnected;
+ return this.isJoined;
}
/**
@@ -172,11 +193,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
logger.info("Attempting to join the distributed system through coordinator " + coord + " using address " + this.localAddress);
JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress,
services.getAuthenticator().getCredentials());
- try {
- services.getMessenger().send(req);
- } catch (IOException e) {
- throw new SystemConnectException("Exception caught while trying to join", e);
- }
+
+ services.getMessenger().send(req);
+
JoinResponseMessage response = null;
synchronized(joinResponse) {
if (joinResponse[0] == null) {
@@ -213,6 +232,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
return false;
}
+
/**
* process a join request from another member. If this is the coordinator
* this method will enqueue the request for processing in another thread.
@@ -226,11 +246,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
incomingRequest.getMemberID());
JoinResponseMessage m = new JoinResponseMessage("Rejecting the attempt of a member using an older version");
m.setRecipient(incomingRequest.getMemberID());
- try {
- services.getMessenger().send(m);
- } catch (IOException e) {
- //ignore - the attempt has been logged and the member can't join
- }
+ services.getMessenger().send(m);
return;
}
Object creds = incomingRequest.getCredentials();
@@ -244,11 +260,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (rejection != null && rejection.length() > 0) {
JoinResponseMessage m = new JoinResponseMessage(rejection);
m.setRecipient(incomingRequest.getMemberID());
- try {
- services.getMessenger().send(m);
- } catch (IOException e2) {
- logger.info("unable to send join response " + rejection + " to " + incomingRequest.getMemberID(), e2);
- }
+ services.getMessenger().send(m);
}
}
recordViewRequest(incomingRequest);
@@ -360,14 +372,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (isCoordinator) {
return;
}
- logger.debug("JoinLeave: this member is becoming the membership coordinator with address {}", localAddress);
+ logger.info("This member is becoming the membership coordinator with address {}", localAddress);
isCoordinator = true;
if (currentView == null) {
// create the initial membership view
NetView newView = new NetView(this.localAddress);
this.localAddress.setVmViewId(0);
installView(newView);
- isConnected = true;
+ isJoined = true;
startCoordinatorServices();
} else {
// create and send out a new view
@@ -381,9 +393,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
leaving.add(oldCoordinator);
}
newView = new NetView(this.localAddress, viewNumber, mbrs, leaving,
- Collections.EMPTY_LIST);
+ Collections.<InternalDistributedMember>emptyList());
}
- sendView(newView);
+ sendView(newView, Collections.<InternalDistributedMember>emptyList());
startCoordinatorServices();
}
} finally {
@@ -395,11 +407,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
private void sendJoinResponses(List<InternalDistributedMember> newMbrs, NetView newView) {
for (InternalDistributedMember mbr: newMbrs) {
JoinResponseMessage response = new JoinResponseMessage(mbr, newView);
- try {
- services.getMessenger().send(response);
- } catch (IOException e) {
- logger.info("unable to send join response to {}", mbr);
- }
+ services.getMessenger().send(response);
}
}
@@ -408,39 +416,36 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
Iterator<String> reason = reasons.iterator();
for (InternalDistributedMember mbr: newMbrs) {
RemoveMemberMessage response = new RemoveMemberMessage(mbr, mbr, reason.next());
- try {
- services.getMessenger().send(response);
- } catch (IOException e) {
- logger.info("unable to send remove message to {}", mbr);
- }
+ services.getMessenger().send(response);
}
}
- boolean prepareView(NetView view) {
- return sendView(view, true, this.prepareResponses);
+ boolean prepareView(NetView view, Collection<InternalDistributedMember> newMembers) {
+ return sendView(view, newMembers, true, this.prepareProcessor);
}
- void sendView(NetView view) {
- sendView(view, false, this.viewResponses);
+ void sendView(NetView view, Collection<InternalDistributedMember> newMembers) {
+ sendView(view, newMembers, false, this.viewProcessor);
}
- boolean sendView(NetView view, boolean preparing, ViewReplyProcessor rp) {
+ boolean sendView(NetView view, Collection<InternalDistributedMember> newMembers, boolean preparing, ViewReplyProcessor rp) {
int id = view.getViewId();
InstallViewMessage msg = new InstallViewMessage(view, services.getAuthenticator().getCredentials(), preparing);
Set<InternalDistributedMember> recips = new HashSet<InternalDistributedMember>(view.getMembers());
+ recips.removeAll(newMembers); // new members get the view in a JoinResponseMessage
+ recips.remove(this.localAddress); // no need to send it to ourselves
+ installView(view);
recips.addAll(view.getCrashedMembers());
+ if (recips.isEmpty()) {
+ return true;
+ }
msg.setRecipients(recips);
rp.initialize(id, recips);
- logger.info("View Creator " + (preparing? "preparing" : "sending") + " new view " + view);
- try {
- services.getMessenger().send(msg);
- }
- catch (IOException e) {
- logger.warn("Unsuccessful in installing new membership view", e);
- return false;
- }
+
+ logger.info((preparing? "preparing" : "sending") + " new view " + view);
+ services.getMessenger().send(msg);
// only wait for responses during preparation
if (preparing) {
@@ -479,11 +484,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (m.isPreparing()) {
if (this.preparedView != null && this.preparedView.getViewId() >= view.getViewId()) {
- try {
- services.getMessenger().send(new ViewAckMessage(m.getSender(), this.preparedView));
- } catch (IOException e) {
- logger.info("unable to send view response to " + m.getSender(), e);
- }
+ services.getMessenger().send(new ViewAckMessage(m.getSender(), this.preparedView));
}
else {
this.preparedView = view;
@@ -506,20 +507,16 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
private void ackView(InstallViewMessage m) {
if (m.getView().contains(m.getView().getCreator())) {
- try {
- services.getMessenger().send(new ViewAckMessage(m.getSender(), m.getView().getViewId(), m.isPreparing()));
- } catch (IOException e) {
- logger.info("unable to send view response to " + m.getSender(), e);
- }
+ services.getMessenger().send(new ViewAckMessage(m.getSender(), m.getView().getViewId(), m.isPreparing()));
}
}
private void processViewAckMessage(ViewAckMessage m) {
if (m.isPrepareAck()) {
- this.prepareResponses.processViewResponse(m.getViewId(), m.getSender(), m.getAlternateView());
+ this.prepareProcessor.processViewResponse(m.getViewId(), m.getSender(), m.getAlternateView());
} else {
- this.viewResponses.processViewResponse(m.getViewId(), m.getSender(), m.getAlternateView());
+ this.viewProcessor.processViewResponse(m.getViewId(), m.getSender(), m.getAlternateView());
}
}
@@ -530,12 +527,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
* @return
*/
private InternalDistributedMember findCoordinator() {
- if (locators == null) {
- DistributionConfig dconfig = services.getConfig().getDistributionConfig();
- String bindAddr = dconfig.getBindAddress();
- locators = GMSUtil.parseLocators(dconfig.getLocators(), bindAddr);
- }
-
assert this.localAddress != null;
FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress);
@@ -553,11 +544,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (response != null && response.getCoordinator() != null) {
anyResponses = false;
coordinators.add(response.getCoordinator());
- GMSMember mbr = (GMSMember)this.localAddress.getNetMember();
- services.getConfig().setNetworkPartitionDetectionEnabled(response.isNetworkPartitionDetectionEnabled());
- if (response.isUsePreferredCoordinators()
- && localAddress.getVmKind() != DistributionManager.LOCATOR_DM_TYPE) {
- mbr.setPreferredForCoordinator(false);
+ if (response.isFromView()) {
+ GMSMember mbr = (GMSMember)this.localAddress.getNetMember();
+ services.getConfig().setNetworkPartitionDetectionEnabled(response.isNetworkPartitionDetectionEnabled());
+ if (response.isUsePreferredCoordinators()
+ && localAddress.getVmKind() != DistributionManager.LOCATOR_DM_TYPE) {
+ mbr.setPreferredForCoordinator(false);
+ }
}
}
} catch (IOException | ClassNotFoundException problem) {
@@ -646,7 +639,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
if (!this.isCoordinator) {
- // get rid of outdated requests
+ // get rid of outdated requests. It's possible some requests are
+ // newer than the view just processed - the senders will have to
+ // resend these
synchronized(viewRequests) {
for (Iterator<DistributionMessage> it = viewRequests.iterator(); it.hasNext(); ) {
DistributionMessage m = it.next();
@@ -712,7 +707,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
@Override
public void emergencyClose() {
isStopping = true;
- isConnected = false;
+ isJoined = false;
stopCoordinatorServices();
isCoordinator = false;
currentView = null;
@@ -772,22 +767,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
newView.remove(localAddress);
InstallViewMessage m = new InstallViewMessage(newView, services.getAuthenticator().getCredentials());
m.setRecipients(newView.getMembers());
- try {
- services.getMessenger().send(m);
- try { Thread.sleep(LEAVE_MESSAGE_SLEEP_TIME); }
- catch (InterruptedException e) { Thread.currentThread().interrupt(); }
- } catch (IOException e) {
- logger.info("JoinLeave: unable to notify remaining members shutdown due to i/o exception", e);
- }
+ services.getMessenger().send(m);
+ try { Thread.sleep(LEAVE_MESSAGE_SLEEP_TIME); }
+ catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
else {
logger.debug("JoinLeave sending a leave request to {}", view.getCoordinator());
LeaveRequestMessage m = new LeaveRequestMessage(view.getCoordinator(), this.localAddress);
- try {
- services.getMessenger().send(m);
- } catch (IOException e) {
- logger.info("JoinLeave: unable to notify membership coordinator of shutdown due to i/o exception", e);
- }
+ services.getMessenger().send(m);
}
} // view.size
}// view != null
@@ -803,11 +790,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (v != null) {
RemoveMemberMessage msg = new RemoveMemberMessage(v.getCoordinator(), m,
reason);
- try {
- services.getMessenger().send(msg);
- } catch (IOException e) {
- logger.info("JoinLeave was unable to remove member " + m + " due to an i/o exception");
- }
+ services.getMessenger().send(msg);
}
}
@@ -825,6 +808,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
services.getMessenger().addHandler(ViewAckMessage.class, this);
services.getMessenger().addHandler(LeaveRequestMessage.class, this);
services.getMessenger().addHandler(RemoveMemberMessage.class, this);
+ services.getMessenger().addHandler(JoinRequestMessage.class, this);
+ services.getMessenger().addHandler(JoinResponseMessage.class, this);
DistributionConfig dc = services.getConfig().getDistributionConfig();
int ackCollectionTimeout = dc.getMemberTimeout() * 2 * 12437 / 10000;
@@ -838,6 +823,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
this.quorumRequired = services.getConfig().getDistributionConfig().getEnableNetworkPartitionDetection();
+ DistributionConfig dconfig = services.getConfig().getDistributionConfig();
+ String bindAddr = dconfig.getBindAddress();
+ locators = GMSUtil.parseLocators(dconfig.getLocators(), bindAddr);
}
@Override
@@ -846,19 +834,26 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
return;
}
logger.debug("JoinLeave processing {}", m);
- if (m instanceof JoinRequestMessage) {
+ switch (m.getDSFID()) {
+ case JOIN_REQUEST:
processJoinRequest((JoinRequestMessage)m);
- } else if (m instanceof JoinResponseMessage) {
+ break;
+ case JOIN_RESPONSE:
processJoinResponse((JoinResponseMessage)m);
- } else if (m instanceof InstallViewMessage) {
+ break;
+ case INSTALL_VIEW_MESSAGE:
processViewMessage((InstallViewMessage)m);
- } else if (m instanceof ViewAckMessage) {
+ break;
+ case VIEW_ACK_MESSAGE:
processViewAckMessage((ViewAckMessage)m);
- } else if (m instanceof LeaveRequestMessage) {
+ break;
+ case LEAVE_REQUEST_MESSAGE:
processLeaveRequest((LeaveRequestMessage)m);
- } else if (m instanceof RemoveMemberMessage) {
+ break;
+ case REMOVE_MEMBER_MESSAGE:
processRemoveRequest((RemoveMemberMessage)m);
- } else {
+ break;
+ default:
throw new IllegalArgumentException("unknown message type: " + m);
}
}
@@ -882,6 +877,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
this.waiting = true;
this.viewId = viewId;
this.recipients = recips;
+ this.conflictingView = null;
}
void processViewResponse(int viewId, InternalDistributedMember sender, NetView conflictingView) {
@@ -894,15 +890,17 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
this.conflictingViewSender = sender;
this.conflictingView = conflictingView;
}
+
Set<InternalDistributedMember> waitingFor = this.recipients;
- waitingFor.remove(sender);
- if (waitingFor.isEmpty()) {
- synchronized(waitingFor) {
+ synchronized(waitingFor) {
+ waitingFor.remove(sender);
+ if (waitingFor.isEmpty()) {
+ logger.debug("All view responses received - notifying waiting thread");
waitingFor.notify();
}
}
+
}
-
}
Set<InternalDistributedMember> waitForResponses() {
@@ -916,6 +914,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
result.wait(1000);
}
} catch (InterruptedException e) {
+ logger.debug("Interrupted while waiting for view resonses");
Thread.currentThread().interrupt();
return result;
}
@@ -942,6 +941,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
+
class ViewCreator extends Thread {
boolean shutdown = false;
@@ -1076,9 +1076,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (this.shutdown || Thread.currentThread().isInterrupted()) {
return false;
}
- prepared = prepareView(newView);
+ prepared = prepareView(newView, joinReqs);
if (!prepared && quorumRequired) {
- Set<InternalDistributedMember> unresponsive = prepareResponses.getUnresponsiveMembers();
+ Set<InternalDistributedMember> unresponsive = prepareProcessor.getUnresponsiveMembers();
try {
removeHealthyMembers(unresponsive);
} catch (InterruptedException e) {
@@ -1087,24 +1087,26 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
return false;
}
- List<InternalDistributedMember> failures = new ArrayList<InternalDistributedMember>(currentView.getCrashedMembers().size() + unresponsive.size());
- failures.addAll(unresponsive);
-
- NetView conflictingView = prepareResponses.getConflictingView();
- if (conflictingView != null
- && !conflictingView.getCreator().equals(localAddress)
- && conflictingView.getViewId() > newView.getViewId()
- && (lastConflictingView == null || conflictingView.getViewId() > lastConflictingView.getViewId())) {
- lastConflictingView = conflictingView;
- failures.addAll(conflictingView.getCrashedMembers());
- }
-
- failures.removeAll(removalReqs);
- if (failures.size() > 0) {
- // abort the current view and try again
- removalReqs.addAll(failures);
- newView = new NetView(localAddress, newView.getViewId()+1, newView.getMembers(), leaveReqs,
- removalReqs);
+ if (!unresponsive.isEmpty()) {
+ List<InternalDistributedMember> failures = new ArrayList<InternalDistributedMember>(currentView.getCrashedMembers().size() + unresponsive.size());
+ failures.addAll(unresponsive);
+
+ NetView conflictingView = prepareProcessor.getConflictingView();
+ if (conflictingView != null
+ && !conflictingView.getCreator().equals(localAddress)
+ && conflictingView.getViewId() > newView.getViewId()
+ && (lastConflictingView == null || conflictingView.getViewId() > lastConflictingView.getViewId())) {
+ lastConflictingView = conflictingView;
+ failures.addAll(conflictingView.getCrashedMembers());
+ }
+
+ failures.removeAll(removalReqs);
+ if (failures.size() > 0) {
+ // abort the current view and try again
+ removalReqs.addAll(failures);
+ newView = new NetView(localAddress, newView.getViewId()+1, newView.getMembers(), leaveReqs,
+ removalReqs);
+ }
}
}
} while (!prepared);
@@ -1112,7 +1114,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
lastConflictingView = null;
- sendView(newView);
+ sendView(newView, joinReqs);
return true;
}
@@ -1133,10 +1135,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
logger.info("checking state of member " + fmbr);
if (services.getHealthMonitor().checkIfAvailable(fmbr, "Member failed to acknowledge a membership view", false)) {
logger.info("member " + fmbr + " passed availability check");
- return null;
+ return fmbr;
}
logger.info("member " + fmbr + " failed availability check");
- return fmbr;
+ return null;
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
index 8621af8..fc5faa6 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
@@ -15,7 +15,8 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage {
private Object credentials;
- public JoinRequestMessage(InternalDistributedMember coord, InternalDistributedMember id, Object credentials) {
+ public JoinRequestMessage(InternalDistributedMember coord,
+ InternalDistributedMember id, Object credentials) {
super();
setRecipient(coord);
this.memberID = id;
@@ -58,12 +59,16 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage {
public void toData(DataOutput out) throws IOException {
DataSerializer.writeObject(memberID, out);
DataSerializer.writeObject(credentials, out);
+ // preserve the multicast setting so the receiver can tell
+ // if this is a mcast join request
+ out.writeBoolean(getMulticast());
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
memberID = DataSerializer.readObject(in);
credentials = DataSerializer.readObject(in);
+ setMulticast(in.readBoolean());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java
index df2cf0a..ab88849 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java
@@ -17,6 +17,7 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage {
private NetView currentView;
private String rejectionMessage;
private InternalDistributedMember memberID;
+ private Object messengerData;
public JoinResponseMessage(InternalDistributedMember memberID, NetView view) {
this.currentView = view;
@@ -43,6 +44,14 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage {
public String getRejectionMessage() {
return rejectionMessage;
}
+
+ public Object getMessengerData() {
+ return this.messengerData;
+ }
+
+ public void setMessengerData(Object data) {
+ this.messengerData = data;
+ }
@Override
public void process(DistributionManager dm) {
@@ -72,6 +81,7 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage {
DataSerializer.writeObject(currentView, out);
DataSerializer.writeObject(memberID, out);
DataSerializer.writeString(rejectionMessage, out);
+ DataSerializer.writeObject(messengerData, out);
}
@Override
@@ -79,6 +89,7 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage {
currentView = DataSerializer.readObject(in);
memberID = DataSerializer.readObject(in);
rejectionMessage = DataSerializer.readString(in);
+ messengerData = DataSerializer.readObject(in);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index 128b2eb..4104833 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_RESPONSE;
import org.apache.logging.log4j.Logger;
import org.jgroups.Address;
@@ -36,6 +37,7 @@ import org.jgroups.ViewId;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.protocols.UDP;
import org.jgroups.stack.IpAddress;
+import org.jgroups.util.Digest;
import org.jgroups.util.UUID;
import com.gemstone.gemfire.DataSerializer;
@@ -175,7 +177,13 @@ public class JGroupsMessenger implements Messenger {
if (transport.isMcastEnabled()) {
- // TODO multicast-specific settings
+ properties = replaceStrings(properties, "MCAST_PORT", String.valueOf(transport.getMcastId().getPort()));
+ properties = replaceStrings(properties, "MCAST_ADDRESS", transport.getMcastId().getHost().getHostAddress());
+ properties = replaceStrings(properties, "MCAST_TTL", String.valueOf(dc.getMcastTtl()));
+ properties = replaceStrings(properties, "MCAST_SEND_BUFFER_SIZE", String.valueOf(dc.getMcastSendBufferSize()));
+ properties = replaceStrings(properties, "MCAST_RECV_BUFFER_SIZE", String.valueOf(dc.getMcastRecvBufferSize()));
+ properties = replaceStrings(properties, "MCAST_RETRANSMIT_INTERVAL", ""+Integer.getInteger("gemfire.mcast-retransmit-interval", 500));
+ properties = replaceStrings(properties, "RETRANSMIT_LIMIT", String.valueOf(dc.getUdpFragmentSize()-256));
}
if (transport.isMcastEnabled() || transport.isTcpDisabled() ||
@@ -208,6 +216,10 @@ public class JGroupsMessenger implements Messenger {
}
properties = replaceStrings(properties, "UDP_FRAGMENT_SIZE", ""+dc.getUdpFragmentSize());
+
+ properties = replaceStrings(properties, "FC_MAX_CREDITS", ""+dc.getMcastFlowControl().getByteAllowance());
+ properties = replaceStrings(properties, "FC_THRESHOLD", ""+dc.getMcastFlowControl().getRechargeThreshold());
+ properties = replaceStrings(properties, "FC_MAX_BLOCK", ""+dc.getMcastFlowControl().getRechargeBlockMs());
this.jgStackConfig = properties;
@@ -218,13 +230,17 @@ public class JGroupsMessenger implements Messenger {
// create the configuration XML string for JGroups
String properties = this.jgStackConfig;
+ logger.debug("JGroups configuration: {}", properties);
+
+ long start = System.currentTimeMillis();
+
// start the jgroups channel and establish the membership ID
try {
InputStream is = new ByteArrayInputStream(properties.getBytes("UTF-8"));
myChannel = new JChannel(is);
} catch (Exception e) {
- throw new SystemConnectException("unable to create jgroups channel", e);
+ throw new GemFireConfigException("unable to create jgroups channel", e);
}
try {
@@ -238,11 +254,8 @@ public class JGroupsMessenger implements Messenger {
establishLocalAddress();
- try {
- logger.info("Messenger established the local identity as {} localHost is {}", localAddress, SocketCreator.getLocalHost());
- } catch (UnknownHostException e) {
-
- }
+ logger.info("JGroups channel created (took {}ms)", System.currentTimeMillis()-start);
+
}
@Override
@@ -361,9 +374,11 @@ public class JGroupsMessenger implements Messenger {
throw new DistributedSystemDisconnectedException("Distributed System is shutting down");
}
- filterMessage(msg);
+ filterOutgoingMessage(msg);
- logger.debug("Membership: sending message via JGroups: {} recipients: {}", msg, msg.getRecipientsDescription());
+ if (logger.isDebugEnabled()) {
+ logger.debug("JGroupsMessenger sending [{}] recipients: {}", msg, msg.getRecipientsDescription());
+ }
InternalDistributedMember[] destinations = msg.getRecipients();
boolean allDestinations = msg.forAll();
@@ -378,7 +393,7 @@ public class JGroupsMessenger implements Messenger {
if (useMcast) {
if (logger.isTraceEnabled())
- logger.trace("Membership: sending message via multicast");
+ logger.trace("This message is being multicast");
Exception problem = null;
try {
@@ -443,11 +458,11 @@ public class JGroupsMessenger implements Messenger {
calculatedMembers.add((GMSMember)destinations[i].getNetMember());
}
} // send to explicit list
- Int2ObjectOpenHashMap messages = new Int2ObjectOpenHashMap();
+ Int2ObjectOpenHashMap<Message> messages = new Int2ObjectOpenHashMap<>();
long startSer = theStats.startMsgSerialization();
boolean firstMessage = true;
- for (Iterator it=calculatedMembers.iterator(); it.hasNext(); ) {
- GMSMember mbr = (GMSMember)it.next();
+ for (Iterator<GMSMember> it=calculatedMembers.iterator(); it.hasNext(); ) {
+ GMSMember mbr = it.next();
short version = mbr.getVersionOrdinal();
if ( !messages.containsKey(version) ) {
Message jmsg = createJGMessage(msg, local, version);
@@ -461,9 +476,7 @@ public class JGroupsMessenger implements Messenger {
theStats.endMsgSerialization(startSer);
Collections.shuffle(calculatedMembers);
int i=0;
- for (Iterator<GMSMember> it=calculatedMembers.iterator();
- it.hasNext(); i++) { // send individually
- GMSMember mbr = it.next();
+ for (GMSMember mbr: calculatedMembers) {
JGAddress to = new JGAddress(mbr);
short version = mbr.getVersionOrdinal();
Message jmsg = (Message)messages.get(version);
@@ -472,8 +485,8 @@ public class JGroupsMessenger implements Messenger {
Message tmp = (i < (calculatedLen-1)) ? jmsg.copy(true) : jmsg;
tmp.setDest(to);
tmp.setSrc(this.jgAddress);
- if (logger.isDebugEnabled())
- logger.debug("Membership: Sending {} to '{}' via udp unicast", tmp, mbr);
+ if (logger.isTraceEnabled())
+ logger.trace("Unicasting to {}", to);
myChannel.send(tmp);
}
catch (Exception e) {
@@ -545,6 +558,10 @@ public class JGroupsMessenger implements Messenger {
msg.setFlag(Flag.NO_FC);
msg.setFlag(Flag.SKIP_BARRIER);
}
+ if (gfmsg instanceof DistributedCacheOperation.CacheOperationMessage) {
+ // we don't want to see our own cache operation messages
+ msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
+ }
try {
HeapDataOutputStream out_stream =
new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version));
@@ -573,14 +590,14 @@ public class JGroupsMessenger implements Messenger {
int messageLength = jgmsg.getLength();
- if (logger.isDebugEnabled()) {
- logger.debug("deserializing a message of length "+messageLength);
+ if (logger.isTraceEnabled()) {
+ logger.trace("deserializing a message of length "+messageLength);
}
if (messageLength == 0) {
// jgroups messages with no payload are used for protocol interchange, such
// as STABLE_GOSSIP
- logger.debug("Message length is zero - ignoring");
+ logger.trace("message length is zero - ignoring");
return null;
}
@@ -624,12 +641,40 @@ public class JGroupsMessenger implements Messenger {
/** look for certain messages that may need to be altered before being sent */
- private void filterMessage(DistributionMessage m) {
- if (m instanceof JoinResponseMessage) {
- // TODO: for mcast does the new JGroups need to have the NAKACK digest transmitted
- // to new members at join-time? The old JGroups needs this and it would require us to
- // install an uphandler for JChannel to handle GET_DIGEST_OK events.
- // I (bruce) am postponing looking into this until we move to the new version of jgroups.
+ private void filterOutgoingMessage(DistributionMessage m) {
+ switch (m.getDSFID()) {
+ case JOIN_RESPONSE:
+ JoinResponseMessage jrsp = (JoinResponseMessage)m;
+
+ if (jrsp.getRejectionMessage() != null
+ && services.getConfig().getTransport().isMcastEnabled()) {
+ // get the multicast message digest and pass it with the join response
+ Digest digest = (Digest)this.myChannel.getProtocolStack()
+ .getTopProtocol().down(Event.GET_DIGEST_EVT);
+ jrsp.setMessengerData(digest);
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void filterIncomingMessage(DistributionMessage m) {
+ switch (m.getDSFID()) {
+ case JOIN_RESPONSE:
+ JoinResponseMessage jrsp = (JoinResponseMessage)m;
+
+ if (jrsp.getRejectionMessage() != null
+ && services.getConfig().getTransport().isMcastEnabled()) {
+ Digest digest = (Digest)jrsp.getMessengerData();
+ if (digest != null) {
+ this.myChannel.getProtocolStack()
+ .getTopProtocol().down(new Event(Event.SET_DIGEST, digest));
+ }
+ }
+ break;
+ default:
+ break;
}
}
@@ -693,7 +738,9 @@ public class JGroupsMessenger implements Messenger {
if (services.getManager().shutdownInProgress())
return;
- logger.debug("JGroupsReceiver received {} headers: {}", jgmsg, jgmsg.getHeaders());
+ if (logger.isDebugEnabled()) {
+ logger.debug("JGroupsMessenger received {} headers: {}", jgmsg, jgmsg.getHeaders());
+ }
Object o = readJGMessage(jgmsg);
if (o == null) {
@@ -729,8 +776,9 @@ public class JGroupsMessenger implements Messenger {
}
try {
+ filterIncomingMessage(msg);
MessageHandler h = getMessageHandler(msg);
- logger.debug("Handler for this message is {}", h);
+ logger.trace("Handler for this message is {}", h);
h.processMessage(msg);
}
catch (MemberShunnedException e) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
index 07b59c4..833f677 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
@@ -769,6 +769,10 @@ public class GMSMembershipManager implements MembershipManager, Manager
private Services services;
+ private boolean mcastEnabled;
+
+ private boolean tcpDisabled;
+
@Override
public boolean isMulticastAllowed() {
@@ -842,13 +846,18 @@ public class GMSMembershipManager implements MembershipManager, Manager
this.wasReconnectingSystem = transport.getIsReconnectingDS();
this.oldDSUDPSocket = (DatagramSocket)transport.getOldDSMembershipInfo();
- if (!config.getDisableTcp()) {
+ // cache these settings for use in send()
+ this.mcastEnabled = transport.isMcastEnabled();
+ this.tcpDisabled = transport.isTcpDisabled();
+
+ if (!this.tcpDisabled) {
dcReceiver = new MyDCReceiver(listener);
}
surpriseMemberTimeout = Math.max(20 * DistributionConfig.DEFAULT_MEMBER_TIMEOUT,
20 * config.getMemberTimeout());
surpriseMemberTimeout = Integer.getInteger("gemfire.surprise-member-timeout", surpriseMemberTimeout).intValue();
+
}
@Override
@@ -857,7 +866,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
RemoteTransportConfig transport = services.getConfig().getTransport();
int dcPort = 0;
- if (!config.getDisableTcp()) {
+ if (!tcpDisabled) {
directChannel = new DirectChannel(this, dcReceiver, config, null);
dcPort = directChannel.getPort();
}
@@ -876,6 +885,8 @@ public class GMSMembershipManager implements MembershipManager, Manager
@Override
public void joinDistributedSystem() {
+ long startTime = System.currentTimeMillis();
+
try {
join();
}
@@ -912,7 +923,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
// in order to debug startup issues we need to announce the membership
// ID as soon as we know it
logger.info(LocalizedMessage.create(LocalizedStrings.GroupMembershipService_entered_into_membership_in_group_0_with_id_1,
- new Object[]{address}));
+ new Object[]{""+(System.currentTimeMillis()-startTime)}));
}
@@ -2136,8 +2147,19 @@ public class GMSMembershipManager implements MembershipManager, Manager
msg.setBreadcrumbsInSender();
Breadcrumbs.setProblem(null);
+
+ boolean useMcast = false;
+ if (mcastEnabled) {
+ useMcast = (msg.getMulticast() || allDestinations);
+ }
- result = directChannelSend(destinations, msg, theStats);
+ if (useMcast || tcpDisabled) {
+ result = services.getMessenger().send(msg);
+ }
+ else {
+ result = directChannelSend(destinations, msg, theStats);
+ }
+
// If the message was a broadcast, don't enumerate failures.
if (allDestinations)
return null;
@@ -2832,13 +2854,6 @@ public class GMSMembershipManager implements MembershipManager, Manager
inhibitForceDisconnectLogging = true;
}
- /**
- * @param uniqueID
- */
- public void setUniqueID(int uniqueID) {
- MemberAttributes.setDefaultVmPid(uniqueID);
- }
-
/** this is a fake message class that is used to flush the serial execution queue */
static class FlushingMessage extends DistributionMessage {
boolean[] done;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c560cb9/gemfire-core/src/main/java/com/gemstone/gemfire/internal/AvailablePort.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/AvailablePort.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/AvailablePort.java
index 29e1f36..e0b9071 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/AvailablePort.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/AvailablePort.java
@@ -100,41 +100,44 @@ public class AvailablePort {
else if (protocol == JGROUPS) {
DatagramSocket socket = null;
try {
- socket = new MulticastSocket();
- socket.setSoTimeout(Integer.getInteger("AvailablePort.timeout", 2000).intValue());
- byte[] buffer = new byte[4];
- buffer[0] = (byte)'p';
- buffer[1] = (byte)'i';
- buffer[2] = (byte)'n';
- buffer[3] = (byte)'g';
- SocketAddress mcaddr = new InetSocketAddress(
- addr==null? DistributionConfig.DEFAULT_MCAST_ADDRESS : addr, port);
- DatagramPacket packet = new DatagramPacket(buffer, 0, buffer.length, mcaddr);
- socket.send(packet);
- try {
- socket.receive(packet);
- packet.getData(); // make sure there's data, but no need to process it
- return false;
- }
- catch (SocketTimeoutException ste) {
- //System.out.println("socket read timed out");
- return true;
- }
- catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- }
- catch (java.io.IOException ioe) {
- if (ioe.getMessage().equals("Network is unreachable")) {
- throw new RuntimeException(LocalizedStrings.AvailablePort_NETWORK_IS_UNREACHABLE.toLocalizedString(), ioe);
- }
- ioe.printStackTrace();
- return false;
- }
- catch (Exception e) {
- e.printStackTrace();
- return false;
+ // TODO - need to find out if anyone is listening on this port
+ return true;
+
+// socket = new MulticastSocket();
+// socket.setSoTimeout(Integer.getInteger("AvailablePort.timeout", 2000).intValue());
+// byte[] buffer = new byte[4];
+// buffer[0] = (byte)'p';
+// buffer[1] = (byte)'i';
+// buffer[2] = (byte)'n';
+// buffer[3] = (byte)'g';
+// SocketAddress mcaddr = new InetSocketAddress(
+// addr==null? DistributionConfig.DEFAULT_MCAST_ADDRESS : addr, port);
+// DatagramPacket packet = new DatagramPacket(buffer, 0, buffer.length, mcaddr);
+// socket.send(packet);
+// try {
+// socket.receive(packet);
+// packet.getData(); // make sure there's data, but no need to process it
+// return false;
+// }
+// catch (SocketTimeoutException ste) {
+// //System.out.println("socket read timed out");
+// return true;
+// }
+// catch (Exception e) {
+// e.printStackTrace();
+// return false;
+// }
+// }
+// catch (java.io.IOException ioe) {
+// if (ioe.getMessage().equals("Network is unreachable")) {
+// throw new RuntimeException(LocalizedStrings.AvailablePort_NETWORK_IS_UNREACHABLE.toLocalizedString(), ioe);
+// }
+// ioe.printStackTrace();
+// return false;
+// }
+// catch (Exception e) {
+// e.printStackTrace();
+// return false;
}
finally {
if (socket != null) {