You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/08/06 00:04:45 UTC
[2/2] incubator-geode git commit: Enabling creation of stand-alone
GMSLocator & adding 2-phase view casting
Enabling creation of stand-alone GMSLocator & adding 2-phase view casting
The locator work will let us unit-test this component while the
view-casting work is in preparation for the health-monitor impl that is
coming soon & will allow us to start testing HA features.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/e71780ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/e71780ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/e71780ae
Branch: refs/heads/feature/GEODE-77
Commit: e71780aea47cded85a98a9eb07a864e77444ff99
Parents: 04cc2d9
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Wed Jul 29 16:50:57 2015 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Wed Aug 5 15:02:26 2015 -0700
----------------------------------------------------------------------
.../cache/util/BoundedLinkedHashMap.java | 1 -
.../internal/InternalDistributedSystem.java | 4 +-
.../membership/InternalDistributedMember.java | 21 +-
.../internal/membership/NetMember.java | 38 +-
.../internal/membership/NetView.java | 110 +++++
.../internal/membership/gms/GMSMember.java | 60 ++-
.../membership/gms/GMSMemberFactory.java | 13 +-
.../membership/gms/GMSMemberServices.java | 260 ------------
.../internal/membership/gms/GMSUtil.java | 2 +-
.../internal/membership/gms/Services.java | 260 ++++++++++++
.../membership/gms/auth/GMSAuthenticator.java | 4 +-
.../membership/gms/fd/GMSHealthMonitor.java | 19 +-
.../gms/interfaces/HealthMonitor.java | 24 +-
.../membership/gms/interfaces/Manager.java | 4 +
.../membership/gms/interfaces/Service.java | 4 +-
.../membership/gms/locator/GMSLocator.java | 18 +-
.../membership/gms/membership/GMSJoinLeave.java | 418 +++++++++++++++----
.../membership/gms/messages/ViewAckMessage.java | 21 +-
.../gms/messenger/AddressManager.java | 5 +-
.../membership/gms/messenger/JGAddress.java | 17 +
.../gms/messenger/JGroupsMessenger.java | 69 +--
.../gms/mgr/GMSMembershipManager.java | 28 +-
.../internal/tcpserver/TcpClient.java | 2 +
.../gemfire/internal/i18n/LocalizedStrings.java | 4 +
.../gemfire/internal/logging/LogService.java | 8 +-
.../internal/logging/log4j/log4j2-default.xml | 2 +-
.../AnalyzeSerializablesJUnitTest.java | 31 --
.../membership/MembershipJUnitTest.java | 268 +++---------
.../sanctionedDataSerializables.txt | 28 --
.../codeAnalysis/sanctionedSerializables.txt | 111 -----
30 files changed, 980 insertions(+), 874 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e71780ae/gemfire-core/src/main/java/com/gemstone/gemfire/cache/util/BoundedLinkedHashMap.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/util/BoundedLinkedHashMap.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/util/BoundedLinkedHashMap.java
index 5397381..1abf24f 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/util/BoundedLinkedHashMap.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/util/BoundedLinkedHashMap.java
@@ -18,7 +18,6 @@ import java.util.Map;
*
* @since 4.2
*/
-@Deprecated
public class BoundedLinkedHashMap extends LinkedHashMap
{
private static final long serialVersionUID = -3419897166186852692L;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e71780ae/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
index 58e8884..52ee2d0 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
@@ -55,7 +55,7 @@ import com.gemstone.gemfire.distributed.internal.locks.GrantorRequestProcessor;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
import com.gemstone.gemfire.distributed.internal.membership.QuorumChecker;
-import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMemberServices;
+import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager;
import com.gemstone.gemfire.i18n.LogWriterI18n;
import com.gemstone.gemfire.internal.Assert;
@@ -561,7 +561,7 @@ public final class InternalDistributedSystem
this.securityLogWriter.fine("SecurityLogWriter is created.");
}
- GMSMemberServices.setSecurityLogWriter(this.securityLogWriter);
+ Services.setSecurityLogWriter(this.securityLogWriter);
this.clock = new DSClock(this.isLoner);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e71780ae/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
index dd9d6f3..d19a23b 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
@@ -60,6 +60,9 @@ public final class InternalDistributedMember
{
private final static long serialVersionUID = -2785249969777296507L;
+ // whether to show NetMember components in toString()
+ private final boolean SHOW_NETMEMBER = Boolean.getBoolean("gemfire.show_netmembers");
+
protected NetMember netMbr; // the underlying member object, e.g. from JGroups
/**
@@ -719,12 +722,6 @@ public final class InternalDistributedMember
break;
}
sb.append(vmStr);
- // for split-brain and security debugging we need to know if the
- // member has the "can't be coordinator" bit set
-// GMSMember jgm = (GMSMember)ipAddr;
-// if (!jgm.getAddress().canBeCoordinator()) {
-// sb.append("<p>");
-// }
sb.append(")");
}
if (netMbr.splitBrainEnabled()) {
@@ -760,6 +757,10 @@ public final class InternalDistributedMember
.append(')');
}
+ if (SHOW_NETMEMBER) {
+ sb.append("[[").append(this.netMbr).append("]]");
+ }
+
// leave out Roles on purpose
// if (netMbr instanceof GMSMember) {
@@ -1075,6 +1076,10 @@ public final class InternalDistributedMember
InternalDataSerializer.getVersionForDataStream(in).ordinal(), attr);
synchPayload();
+
+ if (InternalDataSerializer.getVersionForDataStream(in).compareTo(Version.GFE_90)>=0) {
+ netMbr.readAdditionalData(in);
+ }
}
@@ -1099,6 +1104,10 @@ public final class InternalDistributedMember
}
// write name last to fix bug 45160
DataSerializer.writeString(this.name, out);
+
+ if (InternalDataSerializer.getVersionForDataStream(out).compareTo(Version.GFE_90)>=0) {
+ netMbr.writeAdditionalData(out);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e71780ae/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetMember.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetMember.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetMember.java
index 16d3cc5..8966e0f 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetMember.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetMember.java
@@ -22,17 +22,17 @@ import java.net.InetAddress;
public interface NetMember
{
- public abstract void setAttributes(MemberAttributes args);
+ public void setAttributes(MemberAttributes args);
- public abstract MemberAttributes getAttributes();
+ public MemberAttributes getAttributes();
- public abstract InetAddress getInetAddress();
+ public InetAddress getInetAddress();
- public abstract int getPort();
+ public int getPort();
- public abstract void setPort(int p);
+ public void setPort(int p);
- public abstract boolean isMulticastAddress();
+ public boolean isMulticastAddress();
public short getVersionOrdinal();
@@ -40,13 +40,21 @@ public interface NetMember
* return a flag stating whether the member has network partition detection enabled
* @since 5.6
*/
- public abstract boolean splitBrainEnabled();
+ public boolean splitBrainEnabled();
/**
* return a flag stating whether the member can be the membership coordinator
* @since 5.6
*/
- public abstract boolean preferredForCoordinator();
+ public boolean preferredForCoordinator();
+
+ /**
+ * Set whether this member ID is preferred for coordinator. This
+ * is mostly useful for unit tests because it does not distribute
+ * this status to other members in the distributed system.
+ * @param preferred
+ */
+ public void setPreferredForCoordinator(boolean preferred);
public byte getMemberWeight();
@@ -55,7 +63,7 @@ public interface NetMember
* Excludes channel_name from comparison.
* @return 0 for equality, value less than 0 if smaller, greater than 0 if greater.
*/
- public abstract int compare(NetMember other);
+ public int compare(NetMember other);
/**
* implements the java.lang.Comparable interface
@@ -66,18 +74,18 @@ public interface NetMember
* @exception java.lang.ClassCastException - if the specified object's type prevents it
* from being compared to this Object.
*/
- public abstract int compareTo(Object o);
+ public int compareTo(Object o);
- public abstract boolean equals(Object obj);
+ public boolean equals(Object obj);
- public abstract int hashCode();
+ public int hashCode();
- public abstract String toString();
+ public String toString();
/** write identity information not known by DistributedMember instances */
- public abstract void writeAdditionalData(DataOutput out) throws IOException;
+ public void writeAdditionalData(DataOutput out) throws IOException;
/** read identity information not known by DistributedMember instances */
- public abstract void readAdditionalData(DataInput in) throws ClassNotFoundException, IOException;
+ public void readAdditionalData(DataInput in) throws ClassNotFoundException, IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e71780ae/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
index e6c2b45..2a8f248 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
@@ -11,12 +11,15 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import org.apache.logging.log4j.Logger;
+
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.internal.DataSerializableFixedID;
@@ -196,6 +199,113 @@ public class NetView implements DataSerializableFixedID {
return (firstNonPreferred == null || firstNonPreferred.equals(who));
}
+ /**
+ * returns the weight of the members in this membership view
+ */
+ public int memberWeight() {
+ int result = 0;
+ InternalDistributedMember lead = getLeadMember();
+ for (InternalDistributedMember mbr: this.members) {
+ result += mbr.getNetMember().getMemberWeight();
+ switch (mbr.getVmKind()) {
+ case DistributionManager.NORMAL_DM_TYPE:
+ result += 10;
+ if (lead != null && mbr.equals(lead)) {
+ result += 5;
+ }
+ break;
+ case DistributionManager.LOCATOR_DM_TYPE:
+ result += 3;
+ break;
+ case DistributionManager.ADMIN_ONLY_DM_TYPE:
+ break;
+ default:
+ throw new IllegalStateException("Unknown member type: " + mbr.getVmKind());
+ }
+ }
+ return result;
+ }
+
+ /**
+ * returns the weight of crashed members in this membership view
+ * with respect to the given previous view
+ */
+ public int getCrashedMemberWeight(NetView oldView) {
+ int result = 0;
+ InternalDistributedMember lead = oldView.getLeadMember();
+ for (InternalDistributedMember mbr: this.crashedMembers) {
+ if ( !oldView.contains(mbr)) {
+ continue;
+ }
+ result += mbr.getNetMember().getMemberWeight();
+ switch (mbr.getVmKind()) {
+ case DistributionManager.NORMAL_DM_TYPE:
+ result += 10;
+ if (lead != null && mbr.equals(lead)) {
+ result += 5;
+ }
+ break;
+ case DistributionManager.LOCATOR_DM_TYPE:
+ result += 3;
+ break;
+ case DistributionManager.ADMIN_ONLY_DM_TYPE:
+ break;
+ default:
+ throw new IllegalStateException("Unknown member type: " + mbr.getVmKind());
+ }
+ }
+ return result;
+ }
+
+
+ /**
+ * returns the members of this views crashedMembers collection
+ * that were members of the given view. Admin-only members are
+ * not counted
+ */
+ public List<InternalDistributedMember> getActualCrashedMembers(NetView oldView) {
+ List<InternalDistributedMember> result = new ArrayList(this.crashedMembers.size());
+ InternalDistributedMember lead = oldView.getLeadMember();
+ for (InternalDistributedMember mbr: this.crashedMembers) {
+ if ((mbr.getVmKind() != DistributionManager.ADMIN_ONLY_DM_TYPE)
+ && oldView.contains(mbr)) {
+ result.add(mbr);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * logs the weight of failed members wrt the given previous
+ * view
+ */
+ public void logCrashedMemberWeights(NetView oldView, Logger log) {
+ InternalDistributedMember lead = oldView.getLeadMember();
+ for (InternalDistributedMember mbr: this.crashedMembers) {
+ if ( !oldView.contains(mbr)) {
+ continue;
+ }
+ int mbrWeight = mbr.getNetMember().getMemberWeight();
+ switch (mbr.getVmKind()) {
+ case DistributionManager.NORMAL_DM_TYPE:
+ if (lead != null && mbr.equals(lead)) {
+ mbrWeight += 15;
+ } else {
+ mbrWeight += 5;
+ }
+ break;
+ case DistributionManager.LOCATOR_DM_TYPE:
+ mbrWeight += 3;
+ break;
+ case DistributionManager.ADMIN_ONLY_DM_TYPE:
+ break;
+ default:
+ throw new IllegalStateException("Unknown member type: " + mbr.getVmKind());
+ }
+ log.info(" " + mbr + " had a weight of " + mbrWeight);
+ }
+ }
+
public String toString() {
InternalDistributedMember lead = getLeadMember();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e71780ae/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMember.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMember.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMember.java
index 05c5754..da9ab99 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMember.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMember.java
@@ -20,6 +20,7 @@ import com.gemstone.gemfire.distributed.DurableClientAttributes;
import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes;
import com.gemstone.gemfire.distributed.internal.membership.NetMember;
import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
@@ -36,8 +37,11 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
*
*/
public class GMSMember implements NetMember, DataSerializableFixedID {
+ // whether to show UUID info in toString()
+ private final static boolean SHOW_UUIDS = Boolean.getBoolean("gemfire.show_UUIDs");
+
private int udpPort=0;
- private boolean shouldNotBeCoordinator;
+ private boolean preferredForCoordinator;
private boolean splitBrainEnabled;
private byte memberWeight;
private InetAddress inetAddr;
@@ -84,7 +88,7 @@ public class GMSMember implements NetMember, DataSerializableFixedID {
*/
public GMSMember(GMSMember m) {
udpPort=m.udpPort;
- shouldNotBeCoordinator=m.shouldNotBeCoordinator;
+ preferredForCoordinator=m.preferredForCoordinator;
splitBrainEnabled=m.splitBrainEnabled;
memberWeight=m.memberWeight;
inetAddr=m.inetAddr;
@@ -124,16 +128,22 @@ public class GMSMember implements NetMember, DataSerializableFixedID {
* @param i the hostname, must be for the current host
* @param p the membership listening port
* @param splitBrainEnabled whether the member has network partition detection enabled
- * @param canBeCoordinator whether the member can be group coordinator
- * @param version TODO
+ * @param preferredForCoordinator whether the member can be group coordinator
+ * @param version the member's version ordinal
+ * @param msbs - most significant bytes of UUID
+ * @param lsbs - least significant bytes of UUID
*/
- public GMSMember(InetAddress i, int p, boolean splitBrainEnabled, boolean canBeCoordinator, short version) {
+ public GMSMember(InetAddress i, int p, boolean splitBrainEnabled, boolean preferredForCoordinator,
+ short version,
+ long msbs, long lsbs) {
setAttributes(MemberAttributes.DEFAULT);
this.inetAddr = i;
this.udpPort=p;
this.splitBrainEnabled = splitBrainEnabled;
- this.shouldNotBeCoordinator = !canBeCoordinator;
+ this.preferredForCoordinator = preferredForCoordinator;
this.versionOrdinal = version;
+ this.uuidMSBs = msbs;
+ this.uuidLSBs = lsbs;
}
public int getPort() {
@@ -149,7 +159,11 @@ public class GMSMember implements NetMember, DataSerializableFixedID {
}
public boolean preferredForCoordinator() {
- return !this.shouldNotBeCoordinator;
+ return this.preferredForCoordinator;
+ }
+
+ public void setPreferredForCoordinator(boolean preferred) {
+ this.preferredForCoordinator = preferred;
}
public InetAddress getInetAddress() {
@@ -278,8 +292,12 @@ public class GMSMember implements NetMember, DataSerializableFixedID {
@Override
public String toString() {
StringBuilder sb = new StringBuilder(100);
+ String uuid = SHOW_UUIDS? (";uuid=" + getUUID().toStringLong())
+ : ((this.uuidLSBs == 0 && this.uuidMSBs == 0)? "; no uuid" : "; uuid set");
+
sb.append("GMSMember[addr=").append(inetAddr).append(";port=").append(udpPort)
.append(";processId=").append(processId).append(";name=").append(name)
+ .append(uuid)
.append("]");
return sb.toString();
}
@@ -289,10 +307,6 @@ public class GMSMember implements NetMember, DataSerializableFixedID {
return udpPort;
}
- public boolean isShouldNotBeCoordinator() {
- return shouldNotBeCoordinator;
- }
-
public boolean isSplitBrainEnabled() {
return splitBrainEnabled;
}
@@ -337,10 +351,6 @@ public class GMSMember implements NetMember, DataSerializableFixedID {
this.udpPort = udpPort;
}
- public void setShouldNotBeCoordinator(boolean shouldNotBeCoordinator) {
- this.shouldNotBeCoordinator = shouldNotBeCoordinator;
- }
-
public void setSplitBrainEnabled(boolean splitBrainEnabled) {
this.splitBrainEnabled = splitBrainEnabled;
}
@@ -397,7 +407,7 @@ public class GMSMember implements NetMember, DataSerializableFixedID {
}
static final int SB_ENABLED = 0x01;
- static final int SHOULD_NOT_BE_COORD = 0x02;
+ static final int PREFERRED_FOR_COORD = 0x02;
@Override
public void toData(DataOutput out) throws IOException {
@@ -405,7 +415,7 @@ public class GMSMember implements NetMember, DataSerializableFixedID {
int flags = 0;
if (splitBrainEnabled) flags |= SB_ENABLED;
- if (shouldNotBeCoordinator) flags |= SHOULD_NOT_BE_COORD;
+ if (preferredForCoordinator) flags |= PREFERRED_FOR_COORD;
out.writeInt(flags);
DataSerializer.writeInetAddress(inetAddr, out);
@@ -417,8 +427,10 @@ public class GMSMember implements NetMember, DataSerializableFixedID {
out.writeInt(vmKind);
DataSerializer.writeString(name, out);
DataSerializer.writeStringArray(groups, out);
- out.writeLong(uuidLSBs);
out.writeLong(uuidMSBs);
+ out.writeLong(uuidLSBs);
+// InternalDataSerializer.writeSignedVL(uuidLSBs, out);
+// InternalDataSerializer.writeSignedVL(uuidMSBs, out);
}
@Override
@@ -427,7 +439,7 @@ public class GMSMember implements NetMember, DataSerializableFixedID {
int flags = in.readInt();
this.splitBrainEnabled = (flags & SB_ENABLED) != 0;
- this.shouldNotBeCoordinator = (flags & SHOULD_NOT_BE_COORD) != 0;
+ this.preferredForCoordinator = (flags & PREFERRED_FOR_COORD) != 0;
this.inetAddr = DataSerializer.readInetAddress(in);
this.udpPort = in.readInt();
@@ -440,18 +452,24 @@ public class GMSMember implements NetMember, DataSerializableFixedID {
this.groups = DataSerializer.readStringArray(in);
this.uuidLSBs = in.readLong();
this.uuidMSBs = in.readLong();
+// this.uuidLSBs = InternalDataSerializer.readUnsignedVL(in);
+// this.uuidMSBs = InternalDataSerializer.readUnsignedVL(in);
}
@Override
public void writeAdditionalData(DataOutput out) throws IOException {
- out.writeLong(uuidLSBs);
out.writeLong(uuidMSBs);
+ out.writeLong(uuidLSBs);
+// InternalDataSerializer.writeSignedVL(uuidLSBs, out);
+// InternalDataSerializer.writeSignedVL(uuidMSBs, out);
}
@Override
public void readAdditionalData(DataInput in) throws ClassNotFoundException,
IOException {
- this.uuidLSBs = in.readLong();
this.uuidMSBs = in.readLong();
+ this.uuidLSBs = in.readLong();
+// this.uuidLSBs = InternalDataSerializer.readUnsignedVL(in);
+// this.uuidMSBs = InternalDataSerializer.readUnsignedVL(in);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e71780ae/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMemberFactory.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMemberFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMemberFactory.java
index d92553f..52e8054 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMemberFactory.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMemberFactory.java
@@ -51,7 +51,7 @@ public class GMSMemberFactory implements MemberServices {
*/
public NetMember newNetMember(InetAddress i, int p, boolean splitBrainEnabled,
boolean canBeCoordinator, MemberAttributes attr, short version) {
- GMSMember result = new GMSMember(i, p, splitBrainEnabled, canBeCoordinator, version);
+ GMSMember result = new GMSMember(i, p, splitBrainEnabled, canBeCoordinator, version, 0, 0);
result.setAttributes(attr);
return result;
}
@@ -80,7 +80,7 @@ public class GMSMemberFactory implements MemberServices {
* @return the new NetMember
*/
public NetMember newNetMember(InetAddress i, int p) {
- GMSMember result = new GMSMember(i, p, false, true, Version.CURRENT_ORDINAL);
+ GMSMember result = new GMSMember(i, p, false, true, Version.CURRENT_ORDINAL, 0, 0);
result.setAttributes(getDefaultAttributes());
return result;
}
@@ -113,7 +113,7 @@ public class GMSMemberFactory implements MemberServices {
DistributionConfig config,
RemoteTransportConfig transport, DMStats stats) throws DistributionException
{
- GMSMemberServices services = new GMSMemberServices(listener, config, transport, stats);
+ Services services = new Services(listener, config, transport, stats);
try {
services.init();
services.start();
@@ -121,14 +121,11 @@ public class GMSMemberFactory implements MemberServices {
catch (ConnectionException e) {
throw new DistributionException(LocalizedStrings.JGroupMemberFactory_UNABLE_TO_CREATE_MEMBERSHIP_MANAGER.toLocalizedString(), e);
}
- catch (GemFireConfigException e) {
- throw e;
- }
- catch (SystemConnectException e) {
+ catch (GemFireConfigException | SystemConnectException e) {
throw e;
}
catch (RuntimeException e) {
- GMSMemberServices.getLogger().error("Unexpected problem starting up membership services", e);
+ Services.getLogger().error("Unexpected problem starting up membership services", e);
throw new SystemConnectException("Problem starting up membership services", e);
}
return (MembershipManager)services.getManager();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e71780ae/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMemberServices.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMemberServices.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMemberServices.java
deleted file mode 100755
index d23389f..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMemberServices.java
+++ /dev/null
@@ -1,260 +0,0 @@
-package com.gemstone.gemfire.distributed.internal.membership.gms;
-
-import java.util.Timer;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.CancelCriterion;
-import com.gemstone.gemfire.CancelException;
-import com.gemstone.gemfire.LogWriter;
-import com.gemstone.gemfire.distributed.internal.DMStats;
-import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.distributed.internal.InternalLocator;
-import com.gemstone.gemfire.distributed.internal.membership.DistributedMembershipListener;
-import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
-import com.gemstone.gemfire.distributed.internal.membership.NetView;
-import com.gemstone.gemfire.distributed.internal.membership.gms.membership.GMSJoinLeave;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messenger.JGroupsMessenger;
-import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager;
-import com.gemstone.gemfire.distributed.internal.membership.gms.auth.GMSAuthenticator;
-import com.gemstone.gemfire.distributed.internal.membership.gms.fd.GMSHealthMonitor;
-import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Authenticator;
-import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.HealthMonitor;
-import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinLeave;
-import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Locator;
-import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager;
-import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Messenger;
-import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig;
-import com.gemstone.gemfire.internal.logging.InternalLogWriter;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
-import com.gemstone.gemfire.security.AuthenticationFailedException;
-
-public class GMSMemberServices {
-
- private static final Logger logger = LogService.getLogger();
-
- private static final ThreadGroup threadGroup = LoggingThreadGroup.createThreadGroup("Membership", logger);
-
- private static InternalLogWriter staticSecurityLogWriter;
-
- final private Manager manager;
- final private JoinLeave joinLeave;
- private Locator locator;
- final private HealthMonitor healthMon;
- final private Messenger messenger;
- final private Authenticator auth;
- final private ServiceConfig config;
- final private DMStats stats;
- final private Stopper cancelCriterion;
-
- private InternalLogWriter securityLogWriter;
-
- private Timer timer = new Timer("Membership Timer", true);
-
-
-
- /**
- * A common logger for membership classes
- */
- public static Logger getLogger() {
- return logger;
- }
-
- /**
- * The thread group for all membership threads
- */
- public static ThreadGroup getThreadGroup() {
- return threadGroup;
- }
-
- /**
- * a timer used for membership tasks
- */
- public Timer getTimer() {
- return this.timer;
- }
-
-
-
- public GMSMemberServices(
- DistributedMembershipListener listener, DistributionConfig config,
- RemoteTransportConfig transport, DMStats stats) {
- this.cancelCriterion = new Stopper();
- this.stats = stats;
- this.config = new ServiceConfig(transport, config);
- this.manager = new GMSMembershipManager(listener);
- this.joinLeave = new GMSJoinLeave();
- this.healthMon = new GMSHealthMonitor();
- this.messenger = new JGroupsMessenger();
- this.auth = new GMSAuthenticator();
- }
-
- protected void init() {
- // InternalDistributedSystem establishes this log writer at boot time
- // TODO fix this so that IDS doesn't know about Services
- securityLogWriter = staticSecurityLogWriter;
- staticSecurityLogWriter = null;
- this.auth.init(this);
- this.messenger.init(this);
- this.manager.init(this);
- this.joinLeave.init(this);
- this.healthMon.init(this);
- InternalLocator l = (InternalLocator)com.gemstone.gemfire.distributed.Locator.getLocator();
- if (l != null) {
- l.getLocatorHandler().setMembershipManager((MembershipManager)this.manager);
- this.locator = (Locator)l.getLocatorHandler();
- }
- }
-
- protected void start() {
- boolean started = false;
- try {
- logger.info("Membership: starting Authenticator");
- this.auth.start();
- logger.info("Membership: starting Messenger");
- this.messenger.start();
- logger.info("Membership: starting JoinLeave");
- this.joinLeave.start();
- logger.info("Membership: starting HealthMonitor");
- this.healthMon.start();
- logger.info("Membership: starting Manager");
- this.manager.start();
- started = true;
- } catch (RuntimeException e) {
- logger.fatal("Unexpected exception while booting membership services", e);
- throw e;
- } finally {
- if (!started) {
- this.manager.stop();
- this.healthMon.stop();
- this.joinLeave.stop();
- this.messenger.stop();
- this.auth.stop();
- }
- }
- this.auth.started();
- this.messenger.started();
- this.joinLeave.started();
- this.healthMon.started();
- this.manager.started();
-
- this.manager.joinDistributedSystem();
- }
-
- public void emergencyClose() {
- }
-
- public void stop() {
- logger.info("Membership: stopping services");
- this.joinLeave.stop();
- this.healthMon.stop();
- this.auth.stop();
- this.messenger.stop();
- this.manager.stop();
- this.timer.cancel();
- }
-
- public static void setSecurityLogWriter(InternalLogWriter writer) {
- staticSecurityLogWriter = writer;
- }
-
- public LogWriter getSecurityLogWriter() {
- return this.securityLogWriter;
- }
-
- public Authenticator getAuthenticator() {
- return auth;
- }
-
- public void installView(NetView v) {
- try {
- auth.installView(v);
- } catch (AuthenticationFailedException e) {
- return;
- }
- if (locator != null) {
- locator.installView(v);
- }
- healthMon.installView(v);
- messenger.installView(v);
- manager.installView(v);
- }
-
- public Manager getManager() {
- return manager;
- }
-
- public Locator getLocator() {
- return locator;
- }
-
- public void setLocator(Locator locator) {
- this.locator = locator;
- }
-
- public JoinLeave getJoinLeave() {
- return joinLeave;
- }
-
- public HealthMonitor getHealthMonitor() {
- return healthMon;
- }
-
- public ServiceConfig getConfig() {
- return this.config;
- }
-
- public Messenger getMessenger() {
- return this.messenger;
- }
-
- public DMStats getStatistics() {
- return this.stats;
- }
-
- public Stopper getCancelCriterion() {
- return this.cancelCriterion;
- }
-
-
-
-
- public static class Stopper extends CancelCriterion {
- volatile String reasonForStopping = null;
-
- public void cancel(String reason) {
- this.reasonForStopping = reason;
- }
-
- @Override
- public String cancelInProgress() {
- return reasonForStopping;
- }
-
- public boolean isCancelInProgress() {
- return cancelInProgress() != null;
- }
-
- @Override
- public RuntimeException generateCancelledException(Throwable e) {
- String reason = cancelInProgress();
- if (reason == null) {
- return null;
- }
- else {
- return new ServicesStoppedException(reasonForStopping, e);
- }
- }
-
- }
-
- public static class ServicesStoppedException extends CancelException {
- private static final long serialVersionUID = 2134474966059876801L;
-
- public ServicesStoppedException(String message, Throwable cause) {
- super(message, cause);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e71780ae/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java
index 0005ef3..c65f4d0 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java
@@ -14,7 +14,7 @@ import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes;
import com.gemstone.gemfire.internal.SocketCreator;
public class GMSUtil {
- static Logger logger = GMSMemberServices.getLogger();
+ static Logger logger = Services.getLogger();
public static List<InetSocketAddress> parseLocators(String locatorsString, String bindAddress) {
InetAddress addr = null;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e71780ae/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
new file mode 100755
index 0000000..7478ef3
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
@@ -0,0 +1,260 @@
+package com.gemstone.gemfire.distributed.internal.membership.gms;
+
+import java.util.Timer;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.LogWriter;
+import com.gemstone.gemfire.distributed.internal.DMStats;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.InternalLocator;
+import com.gemstone.gemfire.distributed.internal.membership.DistributedMembershipListener;
+import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
+import com.gemstone.gemfire.distributed.internal.membership.NetView;
+import com.gemstone.gemfire.distributed.internal.membership.gms.membership.GMSJoinLeave;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messenger.JGroupsMessenger;
+import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager;
+import com.gemstone.gemfire.distributed.internal.membership.gms.auth.GMSAuthenticator;
+import com.gemstone.gemfire.distributed.internal.membership.gms.fd.GMSHealthMonitor;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Authenticator;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.HealthMonitor;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinLeave;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Locator;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Messenger;
+import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig;
+import com.gemstone.gemfire.internal.logging.InternalLogWriter;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
+import com.gemstone.gemfire.security.AuthenticationFailedException;
+
+public class Services {
+
+ private static final Logger logger = LogService.getLogger();
+
+ private static final ThreadGroup threadGroup = LoggingThreadGroup.createThreadGroup("Membership", logger);
+
+ private static InternalLogWriter staticSecurityLogWriter;
+
+ final private Manager manager;
+ final private JoinLeave joinLeave;
+ private Locator locator;
+ final private HealthMonitor healthMon;
+ final private Messenger messenger;
+ final private Authenticator auth;
+ final private ServiceConfig config;
+ final private DMStats stats;
+ final private Stopper cancelCriterion;
+
+ private InternalLogWriter securityLogWriter;
+
+ private Timer timer = new Timer("Membership Timer", true);
+
+
+
+ /**
+ * A common logger for membership classes
+ */
+ public static Logger getLogger() {
+ return logger;
+ }
+
+ /**
+ * The thread group for all membership threads
+ */
+ public static ThreadGroup getThreadGroup() {
+ return threadGroup;
+ }
+
+ /**
+ * a timer used for membership tasks
+ */
+ public Timer getTimer() {
+ return this.timer;
+ }
+
+
+
+ public Services(
+ DistributedMembershipListener listener, DistributionConfig config,
+ RemoteTransportConfig transport, DMStats stats) {
+ this.cancelCriterion = new Stopper();
+ this.stats = stats;
+ this.config = new ServiceConfig(transport, config);
+ this.manager = new GMSMembershipManager(listener);
+ this.joinLeave = new GMSJoinLeave();
+ this.healthMon = new GMSHealthMonitor();
+ this.messenger = new JGroupsMessenger();
+ this.auth = new GMSAuthenticator();
+ }
+
+ protected void init() {
+ // InternalDistributedSystem establishes this log writer at boot time
+ // TODO fix this so that IDS doesn't know about Services
+ securityLogWriter = staticSecurityLogWriter;
+ staticSecurityLogWriter = null;
+ this.auth.init(this);
+ this.messenger.init(this);
+ this.manager.init(this);
+ this.joinLeave.init(this);
+ this.healthMon.init(this);
+ InternalLocator l = (InternalLocator)com.gemstone.gemfire.distributed.Locator.getLocator();
+ if (l != null) {
+ l.getLocatorHandler().setMembershipManager((MembershipManager)this.manager);
+ this.locator = (Locator)l.getLocatorHandler();
+ }
+ }
+
+ protected void start() {
+ boolean started = false;
+ try {
+ logger.info("Membership: starting Authenticator");
+ this.auth.start();
+ logger.info("Membership: starting Messenger");
+ this.messenger.start();
+ logger.info("Membership: starting JoinLeave");
+ this.joinLeave.start();
+ logger.info("Membership: starting HealthMonitor");
+ this.healthMon.start();
+ logger.info("Membership: starting Manager");
+ this.manager.start();
+ started = true;
+ } catch (RuntimeException e) {
+ logger.fatal("Unexpected exception while booting membership services", e);
+ throw e;
+ } finally {
+ if (!started) {
+ this.manager.stop();
+ this.healthMon.stop();
+ this.joinLeave.stop();
+ this.messenger.stop();
+ this.auth.stop();
+ }
+ }
+ this.auth.started();
+ this.messenger.started();
+ this.joinLeave.started();
+ this.healthMon.started();
+ this.manager.started();
+
+ this.manager.joinDistributedSystem();
+ }
+
+ public void emergencyClose() {
+ }
+
+ public void stop() {
+ logger.info("Membership: stopping services");
+ this.joinLeave.stop();
+ this.healthMon.stop();
+ this.auth.stop();
+ this.messenger.stop();
+ this.manager.stop();
+ this.timer.cancel();
+ }
+
+ public static void setSecurityLogWriter(InternalLogWriter writer) {
+ staticSecurityLogWriter = writer;
+ }
+
+ public LogWriter getSecurityLogWriter() {
+ return this.securityLogWriter;
+ }
+
+ public Authenticator getAuthenticator() {
+ return auth;
+ }
+
+ public void installView(NetView v) {
+ try {
+ auth.installView(v);
+ } catch (AuthenticationFailedException e) {
+ return;
+ }
+ if (locator != null) {
+ locator.installView(v);
+ }
+ healthMon.installView(v);
+ messenger.installView(v);
+ manager.installView(v);
+ }
+
+ public Manager getManager() {
+ return manager;
+ }
+
+ public Locator getLocator() {
+ return locator;
+ }
+
+ public void setLocator(Locator locator) {
+ this.locator = locator;
+ }
+
+ public JoinLeave getJoinLeave() {
+ return joinLeave;
+ }
+
+ public HealthMonitor getHealthMonitor() {
+ return healthMon;
+ }
+
+ public ServiceConfig getConfig() {
+ return this.config;
+ }
+
+ public Messenger getMessenger() {
+ return this.messenger;
+ }
+
+ public DMStats getStatistics() {
+ return this.stats;
+ }
+
+ public Stopper getCancelCriterion() {
+ return this.cancelCriterion;
+ }
+
+
+
+
+ public static class Stopper extends CancelCriterion {
+ volatile String reasonForStopping = null;
+
+ public void cancel(String reason) {
+ this.reasonForStopping = reason;
+ }
+
+ @Override
+ public String cancelInProgress() {
+ return reasonForStopping;
+ }
+
+ public boolean isCancelInProgress() {
+ return cancelInProgress() != null;
+ }
+
+ @Override
+ public RuntimeException generateCancelledException(Throwable e) {
+ String reason = cancelInProgress();
+ if (reason == null) {
+ return null;
+ }
+ else {
+ return new ServicesStoppedException(reasonForStopping, e);
+ }
+ }
+
+ }
+
+ public static class ServicesStoppedException extends CancelException {
+ private static final long serialVersionUID = 2134474966059876801L;
+
+ public ServicesStoppedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e71780ae/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/auth/GMSAuthenticator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/auth/GMSAuthenticator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/auth/GMSAuthenticator.java
index 518e183..c008171 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/auth/GMSAuthenticator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/auth/GMSAuthenticator.java
@@ -2,14 +2,14 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.auth;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.NetView;
-import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMemberServices;
+import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Authenticator;
import com.gemstone.gemfire.security.AuthenticationFailedException;
public class GMSAuthenticator implements Authenticator {
@Override
- public void init(GMSMemberServices s) {
+ public void init(Services s) {
// TODO Auto-generated method stub
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e71780ae/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 4ad271f..274ecd5 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -3,41 +3,32 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.fd;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.NetView;
-import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMemberServices;
+import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.HealthMonitor;
/** Failure Detection */
public class GMSHealthMonitor implements HealthMonitor {
- private GMSMemberServices services;
+ private Services services;
private NetView currentView;
public static void loadEmergencyClasses() {
}
- /* (non-Javadoc)
- * @see com.gemstone.gemfire.distributed.internal.membership.gms.fd.HealthMonitor#messageReceivedFrom(com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember)
- */
@Override
public void contactedBy(InternalDistributedMember sender) {
// TODO Auto-generated method stub
}
- /* (non-Javadoc)
- * @see com.gemstone.gemfire.distributed.internal.membership.gms.fd.HealthMonitor#suspectMember(com.gemstone.gemfire.distributed.DistributedMember, java.lang.String)
- */
@Override
public void suspect(InternalDistributedMember mbr, String reason) {
// TODO Auto-generated method stub
}
- /* (non-Javadoc)
- * @see com.gemstone.gemfire.distributed.internal.membership.gms.fd.HealthMonitor#checkSuspect(com.gemstone.gemfire.distributed.DistributedMember, java.lang.String)
- */
@Override
- public void checkSuspect(DistributedMember mbr, String reason) {
+ public boolean checkIfAvailable(DistributedMember mbr, String reason, boolean initiateRemoval) {
// TODO Auto-generated method stub
-
+ return true;
}
public void playDead(boolean b) {
@@ -53,7 +44,7 @@ public class GMSHealthMonitor implements HealthMonitor {
}
@Override
- public void init(GMSMemberServices s) {
+ public void init(Services s) {
// TODO Auto-generated method stub
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e71780ae/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java
index 6b268cb..aab95b6 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java
@@ -5,10 +5,28 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM
public interface HealthMonitor extends Service {
- public abstract void contactedBy(InternalDistributedMember sender);
+ /**
+ * Note that this member has been contacted by the given member
+ * @param sender
+ */
+ public void contactedBy(InternalDistributedMember sender);
- public abstract void suspect(InternalDistributedMember mbr, String reason);
+ /**
+ * initiate, asynchronously, suspicion that the member is no longer available
+ * @param mbr
+ * @param reason
+ */
+ public void suspect(InternalDistributedMember mbr, String reason);
- public abstract void checkSuspect(DistributedMember mbr, String reason);
+ /**
+ * Check on the health of the given member, initiating suspicion if it
+ * fails. Return true if the member is found to be available, false
+ * if it isn't.
+ * @param mbr
+ * @param reason the reason this check is being performed
+ * @param initiateRemoval if the member should be removed if it is not available
+ * @return
+ */
+ public boolean checkIfAvailable(DistributedMember mbr, String reason, boolean initiateRemoval);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e71780ae/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java
index 315b69f..b379d6d 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java
@@ -1,12 +1,14 @@
package com.gemstone.gemfire.distributed.internal.membership.gms.interfaces;
import java.io.NotSerializableException;
+import java.util.Collection;
import java.util.Set;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.NetMember;
+import com.gemstone.gemfire.distributed.internal.membership.NetView;
/**
* Manager presents the GMS services to the outside world and
@@ -29,6 +31,8 @@ public interface Manager extends Service, MessageHandler {
Set<InternalDistributedMember> send(DistributionMessage m) throws NotSerializableException;
void forceDisconnect(String reason);
+
+ void quorumLost(Collection<InternalDistributedMember> failures, NetView view);
void addSurpriseMemberForTesting(DistributedMember mbr, long birthTime);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e71780ae/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Service.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Service.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Service.java
index f414d3f..082ea0a 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Service.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Service.java
@@ -1,14 +1,14 @@
package com.gemstone.gemfire.distributed.internal.membership.gms.interfaces;
import com.gemstone.gemfire.distributed.internal.membership.NetView;
-import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMemberServices;
+import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
/**
* Services in GMS all implement this interface
*
*/
public interface Service {
- void init(GMSMemberServices s);
+ void init(Services s);
/**
* called after all services have been initialized
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e71780ae/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
index 576b250..0eb29a1 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
@@ -26,7 +26,7 @@ import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
import com.gemstone.gemfire.distributed.internal.membership.NetView;
import com.gemstone.gemfire.distributed.internal.membership.gms.GMSUtil;
import com.gemstone.gemfire.distributed.internal.membership.gms.NetLocator;
-import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMemberServices;
+import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Locator;
import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager;
import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient;
@@ -47,7 +47,7 @@ public class GMSLocator implements Locator, NetLocator {
private final boolean networkPartitionDetectionEnabled;
private final String locatorString;
private final List<InetSocketAddress> locators;
- private GMSMemberServices services;
+ private Services services;
private Set<InternalDistributedMember> registrants = new HashSet<InternalDistributedMember>();
@@ -80,10 +80,12 @@ public class GMSLocator implements Locator, NetLocator {
@Override
public void setMembershipManager(MembershipManager mgr) {
- logger.info("Peer locator is connecting to local membership services");
- services = ((GMSMembershipManager)mgr).getServices();
- services.setLocator(this);
- this.view = services.getJoinLeave().getView();
+ if (services == null) {
+ logger.info("Peer locator is connecting to local membership services");
+ services = ((GMSMembershipManager)mgr).getServices();
+ services.setLocator(this);
+ this.view = services.getJoinLeave().getView();
+ }
}
@Override
@@ -152,7 +154,9 @@ public class GMSLocator implements Locator, NetLocator {
}
for (InternalDistributedMember mbr: registrants) {
if (mbr != coord && (coord==null || mbr.compareTo(coord) < 0)) {
- coord = mbr;
+ if (mbr.getNetMember().preferredForCoordinator() || !mbr.getNetMember().splitBrainEnabled()) {
+ coord = mbr;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e71780ae/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 3059890..3aa80ab 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -3,12 +3,21 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.membership;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -21,8 +30,8 @@ import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.NetView;
import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember;
+import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
import com.gemstone.gemfire.distributed.internal.membership.gms.GMSUtil;
-import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMemberServices;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinLeave;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler;
import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorRequest;
@@ -34,7 +43,9 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.messages.LeaveRe
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.RemoveMemberMessage;
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.ViewAckMessage;
import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient;
+import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.security.AuthenticationFailedException;
/**
@@ -46,22 +57,22 @@ import com.gemstone.gemfire.security.AuthenticationFailedException;
public class GMSJoinLeave implements JoinLeave, MessageHandler {
/** number of times to try joining before giving up */
- private static final int JOIN_ATTEMPTS = Integer.getInteger("geode.join-attempts", 6);
+ private static final int JOIN_ATTEMPTS = Integer.getInteger("gemfire.join-attempts", 4);
/** amount of time to sleep before trying to join after a failed attempt */
- private static final int JOIN_RETRY_SLEEP = Integer.getInteger("geode.join-retry-sleep", 3000);
+ private static final int JOIN_RETRY_SLEEP = Integer.getInteger("gemfire.join-retry-sleep", 3000);
/** amount of time to wait for a view to be acked by all members before performing suspect processing on non-responders */
- private static final int VIEW_INSTALLATION_TIMEOUT = Integer.getInteger("geode.view-ack-timeout", 12500);
+ private static final int VIEW_INSTALLATION_TIMEOUT = Integer.getInteger("gemfire.view-ack-timeout", 12500);
/** stall time to wait for concurrent join/leave/remove requests to be received */
- private static final long MEMBER_REQUEST_COLLECTION_INTERVAL = Long.getLong("geode.member-request-collection-interval", 2000);
+ private static final long MEMBER_REQUEST_COLLECTION_INTERVAL = Long.getLong("gemfire.member-request-collection-interval", 2000);
/** time to wait for a leave request to be transmitted by jgroups */
- private static final long LEAVE_MESSAGE_SLEEP_TIME = Long.getLong("geode.leave-message-sleep-time", 2000);
+ private static final long LEAVE_MESSAGE_SLEEP_TIME = Long.getLong("gemfire.leave-message-sleep-time", 2000);
/** membership logger */
- private static final Logger logger = GMSMemberServices.getLogger();
+ private static final Logger logger = Services.getLogger();
/** the view ID where I entered into membership */
@@ -70,7 +81,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
/** my address */
private InternalDistributedMember localAddress;
- private GMSMemberServices services;
+ private Services services;
private boolean isConnected;
@@ -88,9 +99,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
/** a new view being installed */
private NetView preparedView;
- private List<InetSocketAddress> locators;
+ /** the last view that conflicted with view preparation */
+ private NetView lastConflictingView;
- private InternalDistributedMember leader;
+ private List<InetSocketAddress> locators;
/** a list of join/leave/crashes */
private final List<DistributionMessage> viewRequests = new LinkedList<DistributionMessage>();
@@ -98,9 +110,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
/** collects the response to a join request */
private JoinResponseMessage[] joinResponse = new JoinResponseMessage[1];
- private ViewReplyProcessor viewResponses = new ViewReplyProcessor();
+ private ViewReplyProcessor viewResponses = new ViewReplyProcessor(false);
+
+ private ViewReplyProcessor prepareResponses = new ViewReplyProcessor(true);
- private boolean disableForcedDisconnect = false;
+ private boolean quorumRequired = false;
+
+ private int viewAckTimeout;
/** background thread that creates new membership views */
private ViewCreator viewCreator;
@@ -189,7 +205,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
GMSMember me = (GMSMember)this.localAddress.getNetMember();
GMSMember o = (GMSMember)response.getMemberID().getNetMember();
me.setSplitBrainEnabled(o.isSplitBrainEnabled());
- me.setShouldNotBeCoordinator(o.isShouldNotBeCoordinator());
+ me.setPreferredForCoordinator(o.preferredForCoordinator());
installView(response.getCurrentView());
return true;
}
@@ -205,12 +221,24 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
* @param incomingRequest
*/
private void processJoinRequest(JoinRequestMessage incomingRequest) {
+ if (incomingRequest.getMemberID().getVersionObject().compareTo(Version.CURRENT) < 0) {
+ logger.warn("detected an attempt to start a peer using an older version of the product {}",
+ incomingRequest.getMemberID());
+ JoinResponseMessage m = new JoinResponseMessage("Rejecting the attempt of a member using an older version");
+ m.setRecipient(incomingRequest.getMemberID());
+ try {
+ services.getMessenger().send(m);
+ } catch (IOException e) {
+ //ignore - the attempt has been logged and the member can't join
+ }
+ return;
+ }
Object creds = incomingRequest.getCredentials();
if (creds != null) {
String rejection = null;
try {
rejection = services.getAuthenticator().authenticate(incomingRequest.getMemberID(), creds);
- } catch (AuthenticationFailedException e) {
+ } catch (Exception e) {
rejection = e.getMessage();
}
if (rejection != null && rejection.length() > 0) {
@@ -382,47 +410,66 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
+
+ boolean prepareView(NetView view) {
+ return sendView(view, true, this.prepareResponses);
+ }
+
void sendView(NetView view) {
- // TODO two-phase view installation and network partition detection
- // send a prepared view, check for conflicting prepared view
- // examine members that don't respond & recreate view and start over
+ sendView(view, false, this.viewResponses);
+ }
+
+
+ boolean sendView(NetView view, boolean preparing, ViewReplyProcessor rp) {
int id = view.getViewId();
- InstallViewMessage msg = new InstallViewMessage(view, services.getAuthenticator().getCredentials());
+ InstallViewMessage msg = new InstallViewMessage(view, services.getAuthenticator().getCredentials(), preparing);
Set<InternalDistributedMember> recips = new HashSet<InternalDistributedMember>(view.getMembers());
recips.addAll(view.getCrashedMembers());
msg.setRecipients(recips);
- this.viewResponses.initialize(id, recips);
- logger.info("View Creator sending new view " + view);
+ rp.initialize(id, recips);
+ logger.info("View Creator " + (preparing? "preparing" : "sending") + " new view " + view);
try {
services.getMessenger().send(msg);
}
catch (IOException e) {
logger.warn("Unsuccessful in installing new membership view", e);
+ return false;
}
-
- Set<InternalDistributedMember> failedToRespond = this.viewResponses.waitForResponses();
-
- logger.info("View Creator is finished waiting for responses to view change");
- if (!failedToRespond.isEmpty() && (services.getCancelCriterion().cancelInProgress() == null)) {
- logger.warn("these members failed to respond to the view change: " + failedToRespond);
+ // only wait for responses during preparation
+ if (preparing) {
+ Set<InternalDistributedMember> failedToRespond = rp.waitForResponses();
+
+ logger.info("View Creator is finished waiting for responses to view preparation");
+
+ InternalDistributedMember conflictingViewSender = rp.getConflictingViewSender();
+ NetView conflictingView = rp.getConflictingView();
+ if (conflictingView != null) {
+ logger.warn("View Creator received a conflicting membership view from " + conflictingViewSender
+ + " during preparation: " + conflictingView);
+ return false;
+ }
+
+ if (!failedToRespond.isEmpty() && (services.getCancelCriterion().cancelInProgress() == null)) {
+ logger.warn("these members failed to respond to the view change: " + failedToRespond);
+ return false;
+ }
}
+
+ return true;
}
private void processViewMessage(InstallViewMessage m) {
NetView view = m.getView();
+
if (currentView != null && view.getViewId() < currentView.getViewId()) {
// ignore old views
ackView(m);
return;
}
- if (currentView != null && !m.isPreparing() && !view.contains(this.localAddress)) {
- services.getManager().forceDisconnect("I am no longer in the membership view");
- return;
- }
if (m.isPreparing()) {
if (this.preparedView != null && this.preparedView.getViewId() >= view.getViewId()) {
@@ -432,30 +479,44 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
logger.info("unable to send view response to " + m.getSender(), e);
}
}
+ else {
+ this.preparedView = view;
+ ackView(m);
+ }
}
-
- if (view.contains(view.getCreator())) {
- ackView(m);
+ else { // !preparing
+ if (currentView != null && !view.contains(this.localAddress)) {
+ if (quorumRequired) {
+ services.getManager().forceDisconnect("This node is no longer in the membership view");
+ }
+ }
+ else {
+ ackView(m);
+ installView(view);
+ }
}
-
- // process the view
- installView(view);
}
private void ackView(InstallViewMessage m) {
- // send an acknowledgement
- try {
- services.getMessenger().send(new ViewAckMessage(m.getSender(), m.getView().getViewId()));
- } catch (IOException e) {
- logger.info("unable to send view response to " + m.getSender(), e);
+ if (m.getView().contains(m.getView().getCreator())) {
+ try {
+ services.getMessenger().send(new ViewAckMessage(m.getSender(), m.getView().getViewId(), m.isPreparing()));
+ } catch (IOException e) {
+ logger.info("unable to send view response to " + m.getSender(), e);
+ }
}
}
private void processViewAckMessage(ViewAckMessage m) {
- this.viewResponses.processViewResponse(m.getViewId(), m.getSender());
+ if (m.isPrepareAck()) {
+ this.prepareResponses.processViewResponse(m.getViewId(), m.getSender(), m.getAlternateView());
+ } else {
+ this.viewResponses.processViewResponse(m.getViewId(), m.getSender(), m.getAlternateView());
+ }
}
+
/**
* This contacts the locators to find out who the current coordinator is.
* All locators are contacted. If they don't agree then we choose the oldest
@@ -468,31 +529,45 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
String bindAddr = dconfig.getBindAddress();
locators = GMSUtil.parseLocators(dconfig.getLocators(), bindAddr);
}
+
assert this.localAddress != null;
+
FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress);
Set<InternalDistributedMember> coordinators = new HashSet<InternalDistributedMember>();
- for (InetSocketAddress addr: locators) {
- try {
- Object o = TcpClient.requestToServer(
- addr.getAddress(), addr.getPort(), request, services.getConfig().getJoinTimeout(),
- true);
- FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse)o : null;
- if (response != null && response.getCoordinator() != null) {
- coordinators.add(response.getCoordinator());
- GMSMember mbr = (GMSMember)this.localAddress.getNetMember();
- services.getConfig().setNetworkPartitionDetectionEnabled(response.isNetworkPartitionDetectionEnabled());
- if (response.isUsePreferredCoordinators()
- && localAddress.getVmKind() != DistributionManager.LOCATOR_DM_TYPE) {
- mbr.setShouldNotBeCoordinator(true);
+ long giveUpTime = System.currentTimeMillis() + (services.getConfig().getLocatorWaitTime() * 1000L);
+ boolean anyResponses = false;
+
+ do {
+ for (InetSocketAddress addr: locators) {
+ try {
+ Object o = TcpClient.requestToServer(
+ addr.getAddress(), addr.getPort(), request, services.getConfig().getJoinTimeout(),
+ true);
+ FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse)o : null;
+ if (response != null && response.getCoordinator() != null) {
+ anyResponses = false;
+ coordinators.add(response.getCoordinator());
+ GMSMember mbr = (GMSMember)this.localAddress.getNetMember();
+ services.getConfig().setNetworkPartitionDetectionEnabled(response.isNetworkPartitionDetectionEnabled());
+ if (response.isUsePreferredCoordinators()
+ && localAddress.getVmKind() != DistributionManager.LOCATOR_DM_TYPE) {
+ mbr.setPreferredForCoordinator(false);
+ }
}
+ } catch (IOException | ClassNotFoundException problem) {
}
- } catch (IOException problem) {
- } catch (ClassNotFoundException problem) {
}
- }
- if (coordinators.isEmpty()) {
- return null;
- }
+ if (coordinators.isEmpty()) {
+ return null;
+ }
+ if (!anyResponses) {
+ try { Thread.sleep(2000); } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
+ }
+ } while (!anyResponses && System.currentTimeMillis() < giveUpTime);
+
Iterator<InternalDistributedMember> it = coordinators.iterator();
if (coordinators.size() == 1) {
return it.next();
@@ -535,10 +610,19 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
// old view - ignore it
return;
}
- checkForPartition(newView);
+
+ if (checkForPartition(newView)) {
+ if (quorumRequired) {
+ List<InternalDistributedMember> crashes = newView.getActualCrashedMembers(currentView);
+ services.getManager().forceDisconnect(
+ LocalizedStrings.Network_partition_detected.toLocalizedString(crashes.size(), crashes));
+ }
+ return;
+ }
+
currentView = newView;
- leader = newView.getLeadMember();
preparedView = null;
+ lastConflictingView = null;
services.installView(newView);
if (!newView.getCreator().equals(this.localAddress)) {
if (newView.shouldBeCoordinator(this.localAddress)) {
@@ -577,18 +661,33 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
- private void checkForPartition(NetView newView) {
- // compare currentView to newView to see if there's been a network
- // partition event
-
+ /**
+ * check to see if the new view shows a drop of 51% or more
+ */
+ private boolean checkForPartition(NetView newView) {
+ if (currentView == null) {
+ return false;
+ }
+ int oldWeight = currentView.memberWeight();
+ int failedWeight = newView.getCrashedMemberWeight(currentView);
+ if (failedWeight > 0) {
+ if (logger.isInfoEnabled()) {
+ newView.logCrashedMemberWeights(currentView, logger);
+ }
+ int failurePoint = (int)(Math.round(51 * oldWeight) / 100.0);
+ if (failedWeight > failurePoint) {
+ services.getManager().quorumLost(newView.getActualCrashedMembers(currentView), currentView);
+ return true;
+ }
+ }
+ return false;
}
/** invoke this under the viewInstallationLock */
private void startCoordinatorServices() {
if (viewCreator == null || viewCreator.isShutdown()) {
- viewCreator = new ViewCreator(Version.CURRENT.getProductName()
- +" Membership View Creator", GMSMemberServices.getThreadGroup());
+ viewCreator = new ViewCreator("GemFire Membership View Creator", Services.getThreadGroup());
viewCreator.setDaemon(true);
viewCreator.start();
}
@@ -707,11 +806,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
@Override
public void disableDisconnectOnQuorumLossForTesting() {
- this.disableForcedDisconnect = true;
+ this.quorumRequired = false;
}
@Override
- public void init(GMSMemberServices s) {
+ public void init(Services s) {
this.services = s;
services.getMessenger().addHandler(JoinRequestMessage.class, this);
services.getMessenger().addHandler(JoinResponseMessage.class, this);
@@ -728,7 +827,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
ackCollectionTimeout = 12437;
}
ackCollectionTimeout = Integer.getInteger("gemfire.VIEW_ACK_TIMEOUT", ackCollectionTimeout).intValue();
-
+ this.viewAckTimeout = ackCollectionTimeout;
+
+ this.quorumRequired = services.getConfig().getDistributionConfig().getEnableNetworkPartitionDetection();
+
}
@Override
@@ -760,14 +862,31 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
class ViewReplyProcessor {
volatile int viewId = -1;
volatile Set<InternalDistributedMember> recipients;
+ volatile NetView conflictingView;
+ volatile InternalDistributedMember conflictingViewSender;
+ volatile boolean waiting;
+ final boolean isPrepareViewProcessor;
+
+ ViewReplyProcessor(boolean forPreparation) {
+ this.isPrepareViewProcessor = forPreparation;
+ }
void initialize(int viewId, Set<InternalDistributedMember> recips) {
+ this.waiting = true;
this.viewId = viewId;
this.recipients = recips;
}
- void processViewResponse(int viewId, InternalDistributedMember sender) {
+ void processViewResponse(int viewId, InternalDistributedMember sender, NetView conflictingView) {
+ if (!this.waiting) {
+ return;
+ }
+
if (viewId == this.viewId) {
+ if (conflictingView != null) {
+ this.conflictingViewSender = sender;
+ this.conflictingView = conflictingView;
+ }
Set<InternalDistributedMember> waitingFor = this.recipients;
waitingFor.remove(sender);
if (waitingFor.isEmpty()) {
@@ -776,24 +895,41 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
}
+
}
Set<InternalDistributedMember> waitForResponses() {
Set<InternalDistributedMember> result = this.recipients;
- long endOfWait = System.currentTimeMillis() + VIEW_INSTALLATION_TIMEOUT;
- while (System.currentTimeMillis() < endOfWait
- && (services.getCancelCriterion().cancelInProgress() == null)) {
- try {
- synchronized(result) {
- result.wait(1000);
+ long endOfWait = System.currentTimeMillis() + viewAckTimeout;
+ try {
+ while (System.currentTimeMillis() < endOfWait
+ && (services.getCancelCriterion().cancelInProgress() == null)) {
+ try {
+ synchronized(result) {
+ result.wait(1000);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return result;
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return result;
}
+ } finally {
+ this.waiting = false;
}
return result;
}
+
+ NetView getConflictingView() {
+ return this.conflictingView;
+ }
+
+ InternalDistributedMember getConflictingViewSender() {
+ return this.conflictingViewSender;
+ }
+
+ Set<InternalDistributedMember> getUnresponsiveMembers() {
+ return this.recipients;
+ }
}
@@ -859,7 +995,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
} // synchronized
if (requests != null && !requests.isEmpty()) {
logger.debug("View Creator is processing {} requests for the next membership view", requests.size());
- createAndSendView(requests);
+ /*boolean success = */createAndSendView(requests);
requests = null;
}
}
@@ -868,7 +1004,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
- void createAndSendView(List<DistributionMessage> requests) {
+ /**
+ * Create a new membership view and send it to members (including crashed members).
+ * Returns false if the view cannot be prepared successfully, true otherwise
+ */
+ boolean createAndSendView(List<DistributionMessage> requests) {
List<InternalDistributedMember> joinReqs = new ArrayList<InternalDistributedMember>();
List<InternalDistributedMember> leaveReqs = new ArrayList<InternalDistributedMember>();
List<InternalDistributedMember> removalReqs = new ArrayList<InternalDistributedMember>();
@@ -909,14 +1049,120 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
newView = new NetView(localAddress, viewNumber, mbrs, leaveReqs,
removalReqs);
}
+
for (InternalDistributedMember mbr: joinReqs) {
mbr.setVmViewId(newView.getViewId());
}
// send removal messages before installing the view so we stop
- // getting messages from them
+ // getting messages from members that have been kicked out
sendRemoveMessages(removalReqs, removalReasons, newView);
- sendJoinResponses(joinReqs, newView);
+
+ // we want to always check for quorum loss but don't act on it
+ // unless network-partition-detection is enabled
+ if ( !(checkForPartition(newView) && quorumRequired) ) {
+ sendJoinResponses(joinReqs, newView);
+ }
+
+ if (quorumRequired) {
+ boolean prepared = false;
+ do {
+ if (this.shutdown || Thread.currentThread().isInterrupted()) {
+ return false;
+ }
+ prepared = prepareView(newView);
+ if (!prepared && quorumRequired) {
+ Set<InternalDistributedMember> unresponsive = prepareResponses.getUnresponsiveMembers();
+ try {
+ removeHealthyMembers(unresponsive);
+ } catch (InterruptedException e) {
+ // abort the view if interrupted
+ shutdown = true;
+ return false;
+ }
+
+ List<InternalDistributedMember> failures = new ArrayList<InternalDistributedMember>(currentView.getCrashedMembers().size() + unresponsive.size());
+ failures.addAll(unresponsive);
+
+ NetView conflictingView = prepareResponses.getConflictingView();
+ if (conflictingView != null
+ && !conflictingView.getCreator().equals(localAddress)
+ && conflictingView.getViewId() > newView.getViewId()
+ && (lastConflictingView == null || conflictingView.getViewId() > lastConflictingView.getViewId())) {
+ lastConflictingView = conflictingView;
+ failures.addAll(conflictingView.getCrashedMembers());
+ }
+
+ failures.removeAll(removalReqs);
+ if (failures.size() > 0) {
+ // abort the current view and try again
+ removalReqs.addAll(failures);
+ newView = new NetView(localAddress, newView.getViewId()+1, newView.getMembers(), leaveReqs,
+ removalReqs);
+ }
+ }
+ } while (!prepared);
+ } // quorumRequired
+
+ lastConflictingView = null;
+
sendView(newView);
+ return true;
+ }
+
+ /**
+ * performs health checks on the collection of members, removing any that
+ * are found to be healthy
+ * @param mbrs
+ */
+ private void removeHealthyMembers(Collection<InternalDistributedMember> mbrs) throws InterruptedException {
+ List<Callable<InternalDistributedMember>> checkers = new ArrayList<Callable<InternalDistributedMember>>(mbrs.size());
+
+ for (InternalDistributedMember mbr: mbrs) {
+ final InternalDistributedMember fmbr = mbr;
+ checkers.add(new Callable<InternalDistributedMember>() {
+ @Override
+ public InternalDistributedMember call() throws Exception {
+ // return the member id if it fails health checks
+ logger.info("checking state of member " + fmbr);
+ if (services.getHealthMonitor().checkIfAvailable(fmbr, "Member failed to acknowledge a membership view", false)) {
+ logger.info("member " + fmbr + " passed availability check");
+ return null;
+ }
+ logger.info("member " + fmbr + " failed availability check");
+ return fmbr;
+ }
+ });
+ }
+
+ ExecutorService svc = Executors.newFixedThreadPool(mbrs.size(), new ThreadFactory() {
+ AtomicInteger i = new AtomicInteger();
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(Services.getThreadGroup(), r,
+ "Member verification thread " + i.incrementAndGet());
+ }
+ });
+
+ try {
+ List<Future<InternalDistributedMember>> futures;
+ futures = svc.invokeAll(checkers);
+
+ for (Future<InternalDistributedMember> future: futures) {
+ try {
+ InternalDistributedMember mbr = future.get(viewAckTimeout, TimeUnit.MILLISECONDS);
+ if (mbr != null) {
+ logger.debug("disregarding lack of acknowledgement from {}", mbr);
+ mbrs.remove(mbr);
+ }
+ } catch (java.util.concurrent.TimeoutException e) {
+ // TODO should the member be removed if we can't verify it in time?
+ } catch (ExecutionException e) {
+ logger.info("unexpected exception caught during member verification", e);
+ }
+ }
+ } finally {
+ svc.shutdownNow();
+ }
}
}