You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/08/21 23:22:26 UTC
[02/51] [partial] incubator-geode git commit: GEODE-77 removing the
old jgroups subproject
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/GossipData.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/GossipData.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/GossipData.java
deleted file mode 100644
index 00d1267..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/GossipData.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: GossipData.java,v 1.2 2004/03/30 06:47:27 belaban Exp $
-
-package com.gemstone.org.jgroups.stack;
-
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.List;
-import java.util.Vector;
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.JChannel;
-import com.gemstone.org.jgroups.JGroupsVersion;
-import com.gemstone.org.jgroups.util.VersionedStreamable;
-
-
-
-/**
- * Encapsulates data sent between GossipServer and GossipClient
- * @author Bela Ban Oct 4 2001
- */
-public class GossipData implements VersionedStreamable {
- private static final long serialVersionUID = 309080226207432135L;
- public static final int REGISTER_REQ = 1;
- public static final int GET_REQ = 2;
- public static final int GET_RSP = 3;
- public static final int GEMFIRE_VERSION = 4;
-
- int type=0;
- String group=null; // REGISTER, GET_REQ and GET_RSP
- Address mbr=null; // REGISTER
- List mbrs=null; // GET_RSP
- short versionOrdinal = -1;
-
- boolean hasDistributedSystem; // GemStoneAddition
- boolean floatingCoordinatorDisabled; // GemStoneAddition
- private boolean networkPartitionDetectionEnabled; // GemStoneAddition
- Vector locators; // GemStoneAddition
- Address localAddress; // GemStoneAddition
-
-
- public GossipData() {
- ; // used for externalization
- }
-
- public GossipData(int type, String group, Address mbr, List mbrs, Vector locators) {
- this.type=type;
- this.group=group;
- this.mbr=mbr;
- this.mbrs=mbrs;
- }
-
- /**
- * GemStoneAddition - added a flag for whether gossip server has a
- * distributed system, and a flag for whether splitBrainDetection
- * is enabled
- */
- public GossipData(int type, String group, Address mbr, List mbrs,
- boolean hasDistributedSystem, boolean floatingDisabled,
- boolean networkPartitionDetectionEnabled, Vector locators, Address localAddress) {
- this.type=type;
- this.group=group;
- this.mbr=mbr;
- this.mbrs=mbrs;
- this.hasDistributedSystem = hasDistributedSystem;
- this.floatingCoordinatorDisabled = floatingDisabled;
- this.networkPartitionDetectionEnabled = networkPartitionDetectionEnabled;
- this.locators = locators;
- this.localAddress = localAddress;
- }
-
-
- public int getType() {return type;}
- public String getGroup() {return group;}
- public Address getMbr() {return mbr;}
- public List getMbrs() {return mbrs;}
-
- // GemStoneAddition
- public boolean getHasDistributedSystem() {
- return hasDistributedSystem;
- }
-
- // GemStoneAddition
- public boolean getFloatingCoordinatorDisabled() {
- return this.floatingCoordinatorDisabled;
- }
-
- // GemStoneAddition
- public boolean getNetworkPartitionDetectionEnabled() {
- return this.networkPartitionDetectionEnabled;
- }
-
- @Override // GemStoneAddition
- public String toString() {
- StringBuffer sb=new StringBuffer();
- sb.append("type=").append(type2String(type));
- switch(type) {
- case REGISTER_REQ:
- sb.append(" group=" + group + ", mbr=" + mbr /*+ ", locators=" + this.locators*/);
- break;
-
- case GET_REQ:
- sb.append(" group=" + group /*+ ", locators=" + this.locators*/);
- break;
-
- case GET_RSP:
- sb.append(" group=" + group + ", mbrs=" + mbrs
- + " hasDS=" + hasDistributedSystem + " coordinator="+mbr+" locators=" + this.locators); // GemStoneAddition
- break;
-
- case GEMFIRE_VERSION:
- if (this.versionOrdinal > 0) {
- sb.append(" version ordinal =" + versionOrdinal);
- }
- break;
- }
-
- return sb.toString();
- }
-
-
- public static String type2String(int t) {
- switch(t) {
- case REGISTER_REQ: return "REGISTER_REQ";
- case GET_REQ: return "GET_REQ";
- case GET_RSP: return "GET_RSP";
- case GEMFIRE_VERSION: return "GEMFIRE_VERSION";
- default: return "<unknown("+t+")>";
- }
- }
-
-
- public void toDataPre_GFE_8_0_0_0(DataOutput out) throws IOException {
- out.writeInt(type);
- if (type == GEMFIRE_VERSION) {
- out.writeShort(versionOrdinal);
- } else {
- out.writeUTF(group == null ? "" : group);
- JChannel.getGfFunctions().writeObject(mbr, out);
- JChannel.getGfFunctions().writeObject(mbrs, out);
- //DataSerializer.writeObject(this.locators, out);
- out.writeBoolean(hasDistributedSystem);
- out.writeBoolean(this.floatingCoordinatorDisabled);
- out.writeBoolean(this.networkPartitionDetectionEnabled);
- }
- }
-
- public void toData(DataOutput out) throws IOException {
- out.writeInt(type);
- if (type == GEMFIRE_VERSION) {
- out.writeShort(versionOrdinal);
- } else {
- out.writeUTF(group == null ? "" : group);
- JChannel.getGfFunctions().writeObject(mbr, out);
- JChannel.getGfFunctions().writeObject(mbrs, out);
- out.writeBoolean(hasDistributedSystem);
- out.writeBoolean(this.floatingCoordinatorDisabled);
- out.writeBoolean(this.networkPartitionDetectionEnabled);
- JChannel.getGfFunctions().writeObject(this.locators, out);
- JChannel.getGfFunctions().writeObject(this.localAddress, out);
- }
- }
-
- public void fromData(DataInput in) throws IOException,
- ClassNotFoundException {
- type=in.readInt();
- if (type == GEMFIRE_VERSION) {
- versionOrdinal = in.readShort();
- } else {
- group = in.readUTF();
- mbr = JChannel.getGfFunctions().readObject(in);
- mbrs = JChannel.getGfFunctions().readObject(in);
- //this.locators = (Vector)DataSerializer.readObject(in);
- hasDistributedSystem = in.readBoolean();
- this.floatingCoordinatorDisabled = in.readBoolean();
- this.networkPartitionDetectionEnabled = in.readBoolean();
- this.locators = JChannel.getGfFunctions().readObject(in);
- this.localAddress = JChannel.getGfFunctions().readObject(in);
- }
- }
-
- public void fromDataPre_GFE_8_0_0_0(DataInput in) throws IOException,
- ClassNotFoundException {
- type=in.readInt();
- if (type == GEMFIRE_VERSION) {
- versionOrdinal = in.readShort();
- } else {
- group = in.readUTF();
- mbr = JChannel.getGfFunctions().readObject(in);
- mbrs = JChannel.getGfFunctions().readObject(in);
- //this.locators = (Vector)DataSerializer.readObject(in);
- hasDistributedSystem = in.readBoolean();
- this.floatingCoordinatorDisabled = in.readBoolean();
- this.networkPartitionDetectionEnabled = in.readBoolean();
- }
- }
-
- /*
- * Versions in which the serialized form of this class or its contents has
- * been changed
- */
- private static short[] serializationVersions = new short[]{ JGroupsVersion.GFE_80_ORDINAL };
-
- @Override
- public short[] getSerializationVersions() {
- return serializationVersions;
- }
-
- @Override
- public void writeTo(DataOutputStream out) throws IOException {
- toData(out);
- }
-
- @Override
- public void readFrom(DataInputStream in) throws IOException,
- IllegalAccessException, InstantiationException {
- try {
- fromData(in);
- } catch (ClassNotFoundException e) {
- throw new IOException("Error reading a GossipData structure", e);
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/GossipServer.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/GossipServer.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/GossipServer.java
deleted file mode 100644
index 9e7a847..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/GossipServer.java
+++ /dev/null
@@ -1,633 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: GossipServer.java,v 1.10 2005/06/09 18:31:02 belaban Exp $
-
-package com.gemstone.org.jgroups.stack;
-
-import java.io.Externalizable;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.ConcurrentModificationException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.Vector;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.JGroupsVersion;
-import com.gemstone.org.jgroups.protocols.TCPGOSSIP;
-import com.gemstone.org.jgroups.util.GemFireTracer;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-
-
-/**
- * Maintains a cache of member addresses for each group. There are essentially 2 functions: get the members for
- * a given group and register a new member for a given group. Clients have to periodically renew their
- * registrations (like in JINI leasing), otherwise the cache will be cleaned periodically (oldest entries first).<p>
- * The server should be running at a well-known port. This can be done by for example adding an entry to
- * /etc/inetd.conf on UNIX systems, e.g. <code>gossipsrv stream tcp nowait root /bin/start-gossip-server</code>.
- * <code>gossipsrv</code> has to be defined in /etc/services and <code>start-gossip-server</code> is a script
- * which starts the GossipServer at the well-known port (define in /etc/services). The protocol between GossipServer
- * and GossipClient consists of REGISTER_REQ, GET_MEMBERS_REQ and GET_MEMBERS_RSP protocol data units.<p>
- * The server does not spawn a thread/request, but does all of its processing on the main thread. This should not
- * be a problem as all requests are short-lived. However, the server would essentially cease processing requests
- * if a telnet connected to it.<p>
- * Requires JDK >= 1.3 due to the use of Timer
- * deprecated Use GossipRouter instead // GemStoneAddition - remove deprecation of GossipServer for now. The router has too much baggage.
- * @author Bela Ban Oct 4 2001
- *
- */
-public abstract class GossipServer {
-
- public final static int GOSSIPVERSION = 1002;
- // Don't change it ever. We did NOT send GemFire version in a Gossip request till 1001 version.
- // This GOSSIPVERSION is used in _getVersionForAddress request for getting GemFire version of a GossipServer.
- public final static int OLDGOSSIPVERSION = 1001;
-
- public final static boolean LOCATOR_DISCOVERY_DISABLED = Boolean.getBoolean("gemfire.disable-locator-discovery");
-
- // change this name to prevent gemfire from connecting to versions having a different name
- public static final String CHANNEL_NAME = "GF7";
-
- private static GossipServer instance; // GemStoneAddition: singleton class
-
- /**
- * GemStoneAddition (comment)
- * <p>
- * Table where the keys are {@link String}s (group names) and the
- * values are {@link List}s of {@link com.gemstone.org.jgroups.stack.GossipServer.Entry}s.
- * <p>
- * Since <code>GossipServer.CacheCleaner</code> accesses this object concurrently,
- * updates to this field must be synchronized on the instance.
- *
- * @see #sweep()
- * @see #addMember(String, Address)
- */
- protected final Map groups=new HashMap(); // groupname - list of Entry's
- static long EXPIRY_TIME_DEFAULT = 30000; // GemStoneAddition
- protected long expiry_time=EXPIRY_TIME_DEFAULT; // time (in msecs) until a cache entry expires
- protected CacheCleaner cache_cleaner=null; // task that is periodically invoked to sweep old entries from the cache
- // TODO use SystemTimer?
- protected Timer timer; // start as daemon thread, so we won't block on it upon termination
-
- protected final GemFireTracer log=GemFireTracer.getLog(getClass());
-
- protected boolean floatingCoordinatorDisabled; // GemStoneAddition
- protected boolean networkPartitionDetectionEnabled; // GemStoneAddition
- protected InetAddress bind_address;
- protected int port;
- protected String locatorString;
-
- /** GemStoneAddition - the current membership coordinator */
- protected Address coordinator;
- protected Vector locators;
- protected Address localAddress; // added in 8.0 for bug #30341
- protected Object localAddressSync = new Object();
- protected boolean withDS; // true if there will be a distributed system
-
-
- public static GossipServer getInstance() {
- synchronized(GossipServer.class) {
- return instance;
- }
- }
-
- public GossipServer(int port, long expiry_time,
- InetAddress bind_address,
- String locatorString,
- boolean floatingCoordinatorDisabled,
- boolean networkPartitionDetectionEnabled,
- boolean withDS) {
- this.port = port;
- this.bind_address = bind_address;
- this.expiry_time=expiry_time;
- this.floatingCoordinatorDisabled = floatingCoordinatorDisabled;
- this.networkPartitionDetectionEnabled = networkPartitionDetectionEnabled;
- this.locatorString = locatorString;
- this.withDS = withDS;
- if (this.locatorString == null || this.locatorString.length() == 0) {
- this.locators = new Vector();
- } else {
- this.locators = TCPGOSSIP.createInitialHosts(this.locatorString);
- }
- }
-
- public void init() {
- synchronized(GossipServer.class) {
- GossipServer.instance = this;
- }
- }
-
- /** recover gossip state from another locator or from the given stateFile
- *
- * @param sFile
- * a file containing gossipserver state from a past run
- * @param locatorsString
- * a GemFire distributed system locators string
- */
- public abstract void recover(File sFile, String locatorsString);
-
- /* ----------------------------------- Private methods ----------------------------------- */
-
- /**
- * compare the given list with the locators vector. If there are
- * any new ones, add them.
- *
- * @param myLocators the existing locators
- * @param otherLocators the locators received from someone else
- * @return true if otherLocators has different locators than myLocators
- */
- public static boolean processLocators(GemFireTracer log, Vector myLocators, Vector otherLocators) {
- if (LOCATOR_DISCOVERY_DISABLED) {
- return false;
- }
- if (otherLocators == null) {
- return true;
- }
- synchronized(myLocators) {
- List newLocators = null;
- for (Iterator e = otherLocators.iterator(); e.hasNext(); ) {
- Address a = (Address)e.next();
- if (!myLocators.contains(a)) {
- if (newLocators == null) {
- newLocators = new LinkedList();
- }
- newLocators.add(a);
- }
- }
- if (newLocators != null) {
- myLocators.addAll(newLocators);
- return true;
- } else {
- return myLocators.size() == otherLocators.size();
- }
- }
- }
-
- /**
- Process the gossip request. Return a gossip response or null if none.
- */
- public synchronized Object processRequest(Object request) {
- if(!(request instanceof GossipData)) {
- throw new IllegalStateException("Expected gossipData, got " + request.getClass());
- }
- GossipData gossip = (GossipData) request;
- String group;
- Address mbr = null;
-
-// if(gossip == null) return null; GemStoneAddition not possible
- if(log.isDebugEnabled()) log.trace("processing request " + gossip.toString()); // GemStoneAddition - changed to trace()
- switch(gossip.getType()) {
- case GossipData.REGISTER_REQ:
- group=gossip.getGroup();
- mbr=gossip.getMbr();
- if(group == null || mbr == null) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.GossipServer_GROUP_OR_MEMBER_IS_NULL_CANNOT_REGISTER_MEMBER);
- return null;
- }
- boolean differed = processLocators(log, locators, gossip.locators);
- return processRegisterRequest(group, mbr, differed);
-
- case GossipData.GET_REQ:
- group=gossip.getGroup();
- mbr = gossip.getMbr();
- if(group == null) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.GossipServer_GROUP_IS_NULL_CANNOT_GET_MEMBERSHIP);
- return null;
- }
- differed = processLocators(log, locators, gossip.locators);
- return processGetRequest(group, mbr, differed); // GemStoneAddition - add mbr to group on a get req to make atomic
-
- case GossipData.GEMFIRE_VERSION:
- return processVersionRequest(); // GemStoneAddition - add mbr to group on a get req to make atomic
-
- case GossipData.GET_RSP: // should not be received
- if(log.isWarnEnabled()) log.warn(ExternalStrings.GossipServer_RECEIVED_A_GET_RSP_SHOULD_NOT_BE_RECEIVED_BY_SERVER);
- return null;
-
- default:
- if(log.isWarnEnabled()) log.warn(
- ExternalStrings.GossipServer_RECEIVED_UNKOWN_GOSSIP_REQUEST_GOSSIP_0
- .toLocalizedString(gossip));
- return null;
- }
- }
-
-
- public void endRequest(Object request,long startTime) { }
- public void endResponse(Object request,long startTime) { }
-
- /**
- * GemStoneAddition - internet protocol version compatibility check
- * @param addr the address of the other member
- * @return true if the member is using the same version of IP we are
- */
- private boolean checkCompatibility(Address addr) {
- if (this.bind_address != null && addr != null) {
- return this.bind_address.getClass() == ((IpAddress)addr).getIpAddress().getClass();
- }
- return true;
- }
-
- GossipData processRegisterRequest(String group, Address mbr, boolean sendLocators) {
- if (!checkCompatibility(mbr)) {
- log.getLogWriter().warning(ExternalStrings.GossipServer_RECEIVED_REGISTRATION_REQUEST_FROM_MEMBER_USING_INCOMPATIBLE_INTERNET_PROTOCOL_0, mbr);
- }
- addMember(group, mbr);
- persistState();
- GossipData rsp = new GossipData();
- if (sendLocators) {
- rsp.locators = new Vector(this.locators);
- }
-
- // bug #30341 - wait until the local address is known before replying to a registration
- if (withDS) {
- try {
- synchronized(this.localAddressSync) {
- while (this.localAddress == null /*&& this.hasDistributedSystem()*/) {
- this.localAddressSync.wait();
- }
- }
- } catch (InterruptedException e) {
- return null;
- }
- }
- rsp.localAddress = this.localAddress;
-
- return rsp;
- }
-
- // GemStoneAddition - added mbr parameter. Member is added to group on
- // a get() request to make it atomic. Bug 30341
- GossipData processGetRequest(String group, Address mbr, boolean sendLocators) {
- if (!checkCompatibility(mbr)) {
- log.getLogWriter().warning(ExternalStrings.GossipServer_RECEIVED_GETMEMBERS_REQUEST_FROM_MEMBER_USING_INCOMPATIBLE_INTERNET_PROTOCOL_0, mbr);
- }
- GossipData ret=null;
- List mbrs;
- synchronized (this) { // fix for bug 30341
- mbrs=getMembers(group);
- if (mbr != null) {
- addMember(group, mbr);
- persistState();
- }
- }
-
- if (withDS) {
- // bug #30341 - wait until the local address is known before replying to a registration
- try {
- synchronized(this.localAddressSync) {
- while (this.localAddress == null /*&& this.hasDistributedSystem()*/) {
- this.localAddressSync.wait();
- }
- }
- } catch (InterruptedException e) {
- return null;
- }
- }
-
- ret=new GossipData(GossipData.GET_RSP, group, this.coordinator, mbrs,
- this.hasDistributedSystem(), this.floatingCoordinatorDisabled,
- this.networkPartitionDetectionEnabled, (sendLocators?this.locators:null), this.localAddress);
-
- if(log.isTraceEnabled()) {
- log.trace("get-members response = " + ret); // GemStoneAddition
- }
-
- return ret;
- }
-
- GossipData processVersionRequest() {
-
- GossipData ret = new GossipData(GossipData.GEMFIRE_VERSION, null, null, null, null);
- ret.versionOrdinal = JGroupsVersion.CURRENT_ORDINAL;
-
- if (log.getLogWriter().fineEnabled()) {
- log.getLogWriter().fine(
- "version response = " + ret.versionOrdinal); // GemStoneAddition
- }
-
- return ret;
- }
-
- public abstract void persistState();
-
-
- /**
- * note that it is okay for this to return false if this locator will have a
- * DS but it has not yet been created
- * @return true if there is a distributed system running in this vm
- */
- public abstract boolean hasDistributedSystem();
-
- /**
- Adds a member to the list for the given group. If the group doesn't exist, it will be created. If the member
- is already present, its timestamp will be updated. Otherwise the member will be added.
- @param group The group name. Guaranteed to be non-null
- @param mbr The member's address. Guaranteed to be non-null
- */
- void addMember(String group, Address mbr) {
- List mbrs;
- synchronized (groups) { // GemStoneAddition
- mbrs =(List) groups.get(group);
- }
- Entry entry;
-
- if(mbrs == null) {
- mbrs=newEntryList();
- mbrs.add(newEntry(mbr));
- synchronized (groups) { // GemStoneAddition
- groups.put(group, mbrs);
- }
- //GemStoneAddition - set to Trace level
- if(log.isTraceEnabled()) log.trace("added " + mbr + " to discovery set for " + group + " (new group)");
- }
- else {
- entry=findEntry(mbrs, mbr);
- if(entry == null) {
- entry=newEntry(mbr);
- synchronized (mbrs) {
- mbrs.add(entry);
- }
- //GemStoneAddition - set to Trace level
- if(log.isTraceEnabled()) log.trace("added " + mbr + " to discovery set for " + group);
- }
- else {
- entry.mbr = mbr;
- entry.update();
- //GemStoneAddition - set to Trace level
- if(log.isTraceEnabled()) log.trace("updated discovery set entry " + entry);
- }
- }
- }
-
- public abstract EntryList newEntryList();
-
- public abstract Entry newEntry(Address mbr);
-
-
- /** test hook for GemFire - see how many members are in the discovery set */
- public int getMemberCount() {
- List mbrs = getMembers(CHANNEL_NAME);
- if (mbrs == null) {
- return -1;
- }
- return mbrs.size();
- }
-
-
- List getMembers(String group) {
- List ret=null;
- List mbrs;
- synchronized (groups) { // GemStoneAddition
- mbrs =(List) groups.get(group);
- }
-
- if(mbrs == null)
- return null;
- synchronized (mbrs) {
- ret=new ArrayList(mbrs.size());
- for(int i=0; i < mbrs.size(); i++)
- ret.add(((Entry) mbrs.get(i)).mbr);
- }
- return ret;
- }
-
-
- Entry findEntry(List mbrs, Address mbr) {
- Entry entry=null;
-
- synchronized (mbrs) { // GemStoneAddition
- for(int i=0; i < mbrs.size(); i++) {
- entry=(Entry) mbrs.get(i);
- if(entry.mbr != null && entry.mbr.equals(mbr))
- return entry;
- }
- }
- return null;
- }
-
-
- /**
- * Remove expired entries (entries older than EXPIRY_TIME msec).
- */
- synchronized void sweep() {
- long current_time=System.currentTimeMillis(), diff;
- int num_entries_removed=0;
- String key=null;
- List val;
- Entry entry;
-
- boolean done; // GemStoneAddition - fix for bug 31361
- do {
- done = true;
- try {
-
- for(Iterator e=groups.keySet().iterator(); e.hasNext();) {
- key=(String) e.next();
- val=(List) groups.get(key);
- if(val != null) {
- // GemStoneAddition -- don't use iterator. It is live
- // and can fail due to concurrent modification. Instead,
- // safely create a copy of the list and iterate over _that_
- ArrayList listCopy;
- synchronized (val) {
- listCopy = new ArrayList(val);
- }
- for(Iterator it=listCopy.iterator() /*val.listIterator()*/; it.hasNext();) {
- entry=(Entry) it.next();
- diff=current_time - entry.timestamp;
- if(entry.timestamp + expiry_time < current_time) {
-// it.remove();
- val.remove(entry);
- if(log.isTraceEnabled()) log.trace("removed member " + entry +
- " from discovery set " + key + '(' + diff + " msecs old)");
- num_entries_removed++;
- }
- }
- }
- }
-
- } catch (ConcurrentModificationException ex) {
- done = false;
- }
- } while (!done);
-
- if(num_entries_removed > 0)
- if(log.isInfoEnabled()) log.info(ExternalStrings.GossipServer_DONE_REMOVED__0__DISCOVERY_ENTRIES, num_entries_removed);
- }
-
- public void shutDown() {
- this.timer.cancel();
- }
-
- /* -------------------------------- End of Private methods ----------------------------------- */
-
- /**
- * Maintains the member address plus a timestamp. Used by CacheCleaner thread to remove old entries.
- */
- public static class Entry {
- public Address mbr=null;
- public long timestamp=0;
-
- public Entry(Address mbr) {
- this.mbr=mbr;
- update();
- }
-
- public void update() {
- timestamp=System.currentTimeMillis();
- }
-
- @Override // GemStoneAddition
- public boolean equals(Object other) {
- // GemStoneAddition - this was assuming the argument was an address,
- // which caused removals on the containing collection to fail
- if(mbr != null && other != null) {
- if (other instanceof Address) {
- return mbr.equals(other);
- }
- else if (other instanceof Entry) {
- return mbr.equals(((Entry)other).mbr);
- }
- }
- return false;
- }
-
- @Override // GemStoneAddition
- public int hashCode() { // GemStoneAddition
- return 0; // TODO more efficient implementation :-)
- }
-
- @Override // GemStoneAddition
- public String toString() {
- return "mbr=" + mbr;
- }
-
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
- this.mbr = new IpAddress();
- this.mbr.readExternal(in);
- update();
- }
-
- public void writeExternal(ObjectOutput out) throws IOException {
- mbr.writeExternal(out);
- }
- }
-
- // GemStoneAddition
- public static class EntryList extends ArrayList<Entry> {
- private static final long serialVersionUID = -3718320921705914128L;
- public EntryList() {
- }
- public EntryList(int size) {
- super(size);
- }
- }
-
-
- /**
- * Periodically sweeps the cache and removes old items (items that are older than EXPIRY_TIME msecs)
- */
- public class CacheCleaner extends TimerTask {
-
- @Override // GemStoneAddition
- public void run() {
- sweep();
- }
-
- }
-
- /**
- * @param coordinator the coordinator to set
- */
- public void setCoordinator(Address coordinator) {
- this.coordinator = coordinator;
- }
-
- /**
- * @param address the address of this member
- */
- public void setLocalAddress(Address address) {
- synchronized(this.localAddressSync) {
- this.localAddress = address;
- this.localAddressSync.notifyAll();
- }
- }
-
- /** GemStoneAddition - we don't want this main method
- public static void main(String[] args)
- throws java.net.UnknownHostException {
- String arg;
- int port=7500;
- long expiry_time=30000;
- GossipServer gossip_server=null;
- InetAddress address=null;
- for(int i=0; i < args.length; i++) {
- arg=args[i];
- if("-help".equals(arg)) {
- System.out.println("GossipServer [-port <port>] [-expiry <msecs>] [-bindaddress <address>]");
- return;
- }
- if("-port".equals(arg)) {
- port=Integer.parseInt(args[++i]);
- continue;
- }
- if("-expiry".equals(arg)) {
- expiry_time=Long.parseLong(args[++i]);
- continue;
- }
- if("-bindaddress".equals(arg)) {
- address=InetAddress.getByName(args[++i]);
- continue;
- }
- System.out.println("GossipServer [-port <port>] [-expiry <msecs>]");
- return;
- }
-
- try {
-
- }
- catch(Throwable ex) {
- System.err.println("GossipServer.main(): " + ex);
- }
-
- try {
- gossip_server=new GossipServer(port, expiry_time, address);
- gossip_server.run();
- }
- catch(Exception e) {
- System.err.println("GossipServer.main(): " + e);
- }
- }
- */
-
- public static int getCurrentGossipVersion() {
- return GOSSIPVERSION;
- }
-
- public static int getOldGossipVersion() {
- return OLDGOSSIPVERSION;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/Interval.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/Interval.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/Interval.java
deleted file mode 100644
index a0ef787..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/Interval.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: Interval.java,v 1.1.1.1 2003/09/09 01:24:12 belaban Exp $
-
-package com.gemstone.org.jgroups.stack;
-
-
-/**
- * Manages retransmission timeouts. Always returns the next timeout, until the last timeout in the
- * array is reached. Returns the last timeout from then on, until reset() is called.
- * @author John Giorgiadis
- * @author Bela Ban
- */
-public class Interval {
- private int next=0;
- private long[] interval=null;
-
- public Interval(long[] interval) {
- if (interval.length == 0)
- throw new IllegalArgumentException("Interval()");
- this.interval=interval;
- }
-
- public long first() { return interval[0]; }
-
- /** @return the next interval */
- public synchronized long next() {
- if (next >= interval.length)
- return(interval[interval.length-1]);
- else
- return(interval[next++]);
- }
-
- public long[] getInterval() { return interval; }
-
- public synchronized void reset() { next = 0; }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/IpAddress.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/IpAddress.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/IpAddress.java
deleted file mode 100644
index d594026..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/IpAddress.java
+++ /dev/null
@@ -1,780 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: IpAddress.java,v 1.29 2005/11/07 09:44:17 belaban Exp $
-
-package com.gemstone.org.jgroups.stack;
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Global;
-import com.gemstone.org.jgroups.JChannel;
-import com.gemstone.org.jgroups.JGroupsVersion;
-import com.gemstone.org.jgroups.util.GemFireTracer;
-import com.gemstone.org.jgroups.util.StreamableFixedID;
-
-import java.io.*;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-
-
-
-/**
- * Network-dependent address (Internet). Generated by the bottommost layer of the protocol
- * stack (UDP). Contains an InetAddress and port.
- * @author Bela Ban
- */
-public class IpAddress implements StreamableFixedID,
- Address {
- private static final long serialVersionUID = -294637383250428305L;
-
- private InetAddress ip_addr=null;
- private int port=0;
- private byte[] additional_data=null;
- protected static final GemFireTracer log=GemFireTracer.getLog(IpAddress.class);
-
- private static final String MEMBER_WEIGHT_PREFIX = "zzzmbrwgt"; // GemStoneAddition
-
- public static boolean resolve_dns=true; // GemStoneAddition - resolve names by default
- private transient int size=-1;
- /** GemStoneAddition - can this member become the group coordinator? */
- private boolean shouldNotBeCoordinator;
- /** GemStoneAddition - does this member have split-brain detection enabled? */
- private boolean splitBrainEnabled;
- private byte memberWeight;
- /** GemStoneAddition - member GemFire version */
- private transient short version = JGroupsVersion.CURRENT_ORDINAL;
-
-// static {
-// /* Trying to get value of resolve_dns. PropertyPermission not granted if
-// * running in an untrusted environment with JNLP */
-// try {
-// resolve_dns=Boolean.valueOf(System.getProperty("resolve.dns", "false")).booleanValue();
-// }
-// catch (SecurityException ex){
-// resolve_dns=false;
-// }
-// }
-
- // GemStoneAddition - defaults to true
- public boolean preferredForCoordinator() {
- return !shouldNotBeCoordinator;
- }
-
- // GemStoneAddition - defaults to false
- public boolean splitBrainEnabled() {
- return this.splitBrainEnabled;
- }
-
- // GemStoneAddition - member weight
- public void setMemberWeight(int weight) {
- this.memberWeight = (byte)Math.min(weight, 255);
- }
-
- // GemStoneAddition - member weight
- public int getMemberWeight() {
- return this.memberWeight;
- }
-
- /** GemStoneAddition - pids help with debugging quite a bit */
- public void setProcessId(int pid) {
- this.processId = pid;
- }
-
- /** GemStoneAddition - get the pid if any */
- public int getProcessId() {
- return this.processId;
- }
-
- /** GemStoneAddition - can this member be the GMS coordinator? */
- public void shouldntBeCoordinator(boolean shouldNotBe) {
- this.shouldNotBeCoordinator = shouldNotBe;
- }
-
- /** GemStoneAddition - sets whether this member has split-brain detection enabled */
- public void splitBrainEnabled(boolean enabled) {
- this.splitBrainEnabled = enabled;
- }
-
- /**
- * GemstoneAddition returns a number that can be used to differentiate two
- * addresses with the same InetAddress and port. This ID is not used in
- * equality comparisons so that the coordinator can distinguish between
- * old and new IDs. Instead, equality comparisons use the birthViewId so
- * that post-join comparisons of IpAddresses can easily distinguish between
- * new and old, reused addresses.
- */
- public int getUniqueID() {
- return this.directPort != 0? this.directPort : this.processId;
- }
-
- /**
- * get the roles of this GemFire member (GemStoneAddition)
- */
- public String getName() {
- return this.name == null? "" : this.name;
- }
-
- /**
- * get the roles of this GemFire member (GemStoneAddition)
- */
- public String[] getRoles() {
- if (this.additional_data != null) {
- try {
- DataInput di = new DataInputStream(new ByteArrayInputStream(this.additional_data));
- return JChannel.getGfFunctions().readStringArray(di);
- } catch (Exception e) {
- throw new RuntimeException("unable to read roles", e);
- }
- }
- return new String[0];
- }
-
- /**
- * set the durable client attributes (GemStoneAddition)
- */
- public void setDurableClientAttributes(Object d) {
- durableClientAttributes = d;
- }
-
- /**
- * get the durable client attributes (GemStoneAddition)
- */
- public Object getDurableClientAttributes() {
- return durableClientAttributes;
- }
-
- /**
- * set the roles of this GemFire member (GemStoneAddition)
- */
- public void setRoles(String[] roles) {
- // use additional_data to hold roles
- if (roles.length > 0) {
- try {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutput dao = new DataOutputStream(baos);
- JChannel.getGfFunctions().writeStringArray(roles, dao);
- this.additional_data = baos.toByteArray();
- } catch (Exception e) {
- throw new RuntimeException("unable to serialize roles", e);
- }
- }
- }
-
- /** GemStoneAddition - process id */
- private int processId;
- private int vmKind;
- private int birthViewId = -1;
- private int directPort;
- private String name;
- private Object durableClientAttributes;
-
-
- // Used only by Externalization
- public IpAddress() {
- }
-
- public IpAddress(String i, int p) {
- port=p;
- try {
- ip_addr=InetAddress.getByName(i);
- }
- catch(Exception e) {
- if(log.isWarnEnabled()) log.warn("failed to get " + i + ": " + e);
- }
- if(this.ip_addr == null)
- setAddressToLocalHost();
-// setGemFireAttributes(MemberAttributes.DEFAULT); // GemStoneAddition
- }
-
-
-
- public IpAddress(InetAddress i, int p) {
- ip_addr=i; port=p;
- if(this.ip_addr == null)
- setAddressToLocalHost();
- JChannel.getGfFunctions().setDefaultGemFireAttributes(this); // GemStoneAddition
- }
-
-
- private void setAddressToLocalHost() {
- try {
- // GemStoneAddition - use the configured GemFire bind address, if present, as the default
- String bindAddress = System.getProperty("gemfire.jg-bind-address");
- if (bindAddress != null && bindAddress.length() > 0) {
- ip_addr=InetAddress.getByName(bindAddress);
- }
- else {
- ip_addr=JChannel.getGfFunctions().getLocalHost(); // get first NIC found (on multi-homed systems)
- }
- size=-1; // GemStoneAddition
- setSize(size(version));
- }
- catch(Exception e) {
- if(log.isWarnEnabled()) log.warn("caught unexpected exception", e);
- }
- }
-
- public IpAddress(int port) {
- this.port=port;
- setAddressToLocalHost();
- JChannel.getGfFunctions().setDefaultGemFireAttributes(this); // GemStoneAddition
- }
-
-
-
- public final InetAddress getIpAddress() {return ip_addr;}
- public final int getPort() {return port;}
-
- public final SocketAddress getSocketAddress() { // GemStoneAddition
- return new InetSocketAddress(ip_addr, port);
- }
-
-
- /** GemStoneAddition - cache the result of querying */
- private transient boolean isMcastAddr;
- private transient boolean isMcastAddrCached;
-
- public final boolean isMulticastAddress() {
- if (!isMcastAddrCached) {
- isMcastAddr = ip_addr != null && ip_addr.isMulticastAddress();
- isMcastAddrCached = true;
- }
- return isMcastAddr;
- }
-
- /**
- * Returns the additional_data.
- * @return byte[]
- */
- public final byte[] getAdditionalData() {
- return additional_data;
- }
-
- /**
- * Sets the additional_data.
- * @param additional_data The additional_data to set
- */
- public final void setAdditionalData(byte[] additional_data) {
- this.additional_data = additional_data;
- size=-1; // GemStoneAddition
- setSize(size(version));
- }
-
- // GemStoneAddition
- public int getVmKind() {
- return this.vmKind;
- }
-
- // GemStoneAddition
- public void setVmKind(int vmKind) {
- this.vmKind = vmKind;
- }
-
- // GemStoneAddition
- public int getDirectPort() {
- return this.directPort;
- }
-
- // GemStoneAddition
- public void setDirectPort(int directPort) {
- this.directPort = directPort;
- }
-
- // GemStoneAddition
- public int getBirthViewId() {
- return this.birthViewId;
- }
-
- // GemStoneAddition
- public void setBirthViewId(long vid) {
- this.birthViewId = (int)(vid & Integer.MAX_VALUE);
- }
-
- // GemStoneAddition
- public final short getVersionOrdinal() {
- return this.version;
- }
-
- // GemStoneAddition
- public final void setVersionOrdinal(short version) {
- this.version = version;
- }
-
- // GemStoneAddition
- public void setName(String v) {
- if (name == null) {
- this.name = "";
- } else {
- this.name = v;
- }
- }
-
- /**
- * Establishes an order between 2 addresses. Assumes other contains non-null IpAddress.
- * Excludes channel_name from comparison.
- * @return 0 for equality, value less than 0 if smaller, greater than 0 if greater.
- */
- public final int compare(IpAddress other) {
- return compareTo(other);
- }
-
-
- /**
- * implements the java.lang.Comparable interface
- * @see java.lang.Comparable
- * @param o - the Object to be compared
- * @return a negative integer, zero, or a positive integer as this object is less than,
- * equal to, or greater than the specified object.
- * @exception java.lang.ClassCastException - if the specified object's type prevents it
- * from being compared to this Object.
- */
- public final int compareTo(Object o) {
-// int h1, h2, rc; // added Nov 7 2005, makes sense with canonical addresses
-
- if(this == o) return 0;
- if ((o == null) || !(o instanceof IpAddress))
- throw new ClassCastException("comparison between different classes: the other object is " +
- (o != null? o.getClass() : o));
- IpAddress other = (IpAddress) o;
- if(ip_addr == null)
- if (other.ip_addr == null) return port < other.port ? -1 : (port > other.port ? 1 : 0);
- else return -1;
-
- // GemStoneAddition - use ipAddress bytes instead of hash, which is really a hash in Ipv6 addresses
-// h1=ip_addr.hashCode();
-// h2=other.ip_addr.hashCode();
-// rc=h1 < h2? -1 : h1 > h2? 1 : 0;
- byte[] myBytes = ip_addr.getAddress();
- byte[] otherBytes = other.ip_addr.getAddress();
-
- if (myBytes != otherBytes) {
- for (int i = 0; i < myBytes.length; i++) {
- if (i >= otherBytes.length)
- return -1; // same as far as they go, but shorter...
- if (myBytes[i] < otherBytes[i])
- return -1;
- if (myBytes[i] > otherBytes[i])
- return 1;
- }
- if (myBytes.length > otherBytes.length)
- return 1; // same as far as they go, but longer...
- }
- int comp = ((port < other.port) ? -1
- : (port > other.port ? 1 : 0));
- // GemStoneAddition - bug #41983, address of kill-9'd member is reused
- // before it can be ejected from membership
- if (comp == 0) {
- if (this.birthViewId >= 0 && other.birthViewId >= 0) {
- if (this.birthViewId < other.birthViewId) {
- comp = -1;
- } else if (other.birthViewId < this.birthViewId) {
- comp = 1;
- }
- } else if (this.processId != 0 && other.processId != 0) {
- // starting in 8.0 we also consider the processId. During startup
- // we may have a message from a member that hasn't finished joining
- // and address canonicalization may find an old address that has
- // the same addr:port. Since the new member doesn't have a viewId
- // its address will be equal to the old member's address unless
- // we also pay attention to the processId.
- if (this.processId < other.processId){
- comp = -1;
- } else if (other.processId < this.processId) {
- comp = 1;
- }
- }
- }
- return comp;
- }
-
-
-
- @Override // GemStoneAddition
- public final boolean equals(Object obj) {
- if(this == obj) return true; // added Nov 7 2005, makes sense with canonical addresses
- if(obj == null) return false;
- if (!(obj instanceof IpAddress)) return false; // GemStoneAddition
- return compareTo(obj) == 0 ? true : false;
- }
-
-
-
-
- @Override // GemStoneAddition
- public final int hashCode() {
- return ip_addr != null ? ip_addr.hashCode() + port : port;
- }
-
-
-
-
- @Override // GemStoneAddition
- public String toString() {
- StringBuffer sb=new StringBuffer();
-
- if(ip_addr == null)
- sb.append("<null>");
- else {
- if(ip_addr.isMulticastAddress())
- sb.append(ip_addr.getHostAddress());
- else {
- String host_name=null;
- if(resolve_dns) // GemStoneAddition
- host_name=JChannel.getGfFunctions().getHostName(ip_addr);
- else
- host_name=ip_addr.getHostAddress();
- appendShortName(host_name, sb);
- }
- }
- // GemStoneAddition - name and process id
- if (!"".equals(this.name) || processId > 0) {
- sb.append('(');
- if (!"".equals(this.name)) {
- sb.append(this.name);
- if (processId > 0) {
- sb.append(':');
- }
- }
- if (processId > 0) {
- sb.append(processId);
- }
- String vmKindStr = JChannel.getGfFunctions().getVmKindString(vmKind);
- sb.append(vmKindStr);
- sb.append(')');
- }
- // GemStoneAddition - coordinator inhibition
-
- if (this.splitBrainEnabled) {
- if (!this.shouldNotBeCoordinator) {
- sb.append("<ec>");
- }
- }
-
- if (this.birthViewId >= 0) {
- sb.append("<v" + this.birthViewId + ">");
- }
- if (this.version != JChannel.getGfFunctions().getCurrentVersionOrdinal()) {
- sb.append("(version:").append(this.version)
- .append(')');
- }
-
- /* if (this.splitBrainEnabled) {
- sb.append("<sb>");
- }
-*/
- sb.append(":" + port);
- //GemStoneAddition - don't print encoded additional_data
-// if(additional_data != null)
-// sb.append(" (additional data: ").append(additional_data.length).append(" bytes)");
- return sb.toString();
- }
-
-
-
-
-
- /**
- * Input: "daddy.nms.fnc.fujitsu.com", output: "daddy". Appends result to string buffer 'sb'.
- * @param hostname The hostname in long form. Guaranteed not to be null
- * @param sb The string buffer to which the result is to be appended
- */
- private void appendShortName(String hostname, StringBuffer sb) {
- if(hostname == null) return;
- int index=hostname.indexOf('.');
- if(index > 0 && !Character.isDigit(hostname.charAt(0)))
- sb.append(hostname.substring(0, index));
- else
- sb.append(hostname);
- }
-
-
- public void writeExternal(ObjectOutput out) throws IOException {
- if(ip_addr != null) {
- byte[] address=ip_addr.getAddress();
- out.writeByte(address.length);
- out.write(address, 0, address.length);
- }
- else {
- out.writeByte(0);
- }
- out.writeShort(port & 0xffff);
- out.writeInt(processId); // GemStoneAddition
- out.writeInt(directPort);
- out.writeByte(vmKind);
- out.writeInt(this.birthViewId);
- out.writeUTF(getName());
- if(additional_data != null) {
- out.writeInt(additional_data.length);
- out.write(additional_data, 0, additional_data.length);
- }
- else {
- out.writeInt(0);
- }
- out.writeByte(getFlags()); // GemStoneAddition
- JGroupsVersion.writeOrdinal(out, this.version, true); // GemStoneAddition
- }
-
-
-
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- int len=in.readUnsignedByte();
- if(len > 0) {
- byte[] a = new byte[len];
- in.readFully(a);
- this.ip_addr=InetAddress.getByAddress(a);
- }
- //then read the port
- port=in.readUnsignedShort();
-
- // GemStoneAddition - process id, etc
- processId = in.readInt();
- directPort = in.readInt();
- vmKind = in.readByte();
- birthViewId = in.readInt();
- name = in.readUTF();
- len=in.readInt();
- if(len > 0) {
- additional_data=new byte[len];
- in.readFully(additional_data, 0, additional_data.length);
- }
- int flags = in.readUnsignedByte(); // GemStoneAddition
- setFlags(flags);
- readVersion(flags, in); // GemStoneAddition
- }
-
- public void writeTo(DataOutputStream out) throws IOException {
- toData(out);
- }
-
- public void readFrom(DataInputStream in) throws IOException {
- fromData(in);
- }
-
- public byte getFlags() {
- // GemStoneAddition - flags
- int flags = 0;
- if (this.shouldNotBeCoordinator) {
- flags |= 0x1;
- }
- if (this.splitBrainEnabled) {
- flags |= 0x2;
- }
- // always add version to flags but allow for absence of this flag
- flags |= 0x4;
- return (byte)(flags & 0xff);
- }
-
- public void setFlags(int flags) {
- // GemStoneAddition - flags
- if ((flags & 0x1) == 0x1) {
- this.shouldNotBeCoordinator = true;
- }
- if ((flags & 0x2) == 0x2) {
- this.splitBrainEnabled = true;
- }
- }
-
- // GemStoneAddition - version
- public void readVersion(int flags, DataInput in) throws IOException {
- if ((flags & 0x4) == 0x4) {
- this.version = JGroupsVersion.readOrdinal(in);
- if (this.version == 0) {
- this.version = JChannel.getGfFunctions().getCurrentVersionOrdinal();
- }
- }
- }
-
- // GemStoneAddition - dataserializable
- public int getDSFID() {
- return IP_ADDRESS;
- }
-
- // GemStoneAddition - dataserializable
- public void toData(DataOutput out) throws IOException {
- byte[] address;
-
- if(ip_addr != null) {
- address=ip_addr.getAddress();
- out.writeByte(address.length);
- out.write(address, 0, address.length);
- }
- else {
- out.writeByte(0);
- }
- out.writeShort(port);
- out.writeInt(processId); // GemStoneAddition
- out.writeInt(directPort);
- out.writeByte(vmKind);
- out.writeInt(birthViewId);
- out.writeUTF(getName());
- // for 6.x piggyback the weight in the roles array. For 7.0 we will
- // need to add it as a field and, hopefully, add an extensible way to
- // add new attributes that an old version of the product can ignore.
- // The GossipServer FILE_FORMAT will need to be bumped when we do that.
-// out.writeByte(memberWeight);
- if (memberWeight > 0) {
- String[] forser;
- String[] roles = getRoles();
- forser = new String[roles.length+1];
- if (roles.length > 0) {
- System.arraycopy(roles, 0, forser, 0, roles.length);
- }
- forser[forser.length-1] = MEMBER_WEIGHT_PREFIX + memberWeight;
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutput dao = new DataOutputStream(baos);
- JChannel.getGfFunctions().writeStringArray(forser, dao);
- byte[] payload = baos.toByteArray();
- out.writeInt(payload.length);
- out.write(payload, 0, payload.length);
- } else {
- if(additional_data != null) {
- out.writeInt(additional_data.length);
- out.write(additional_data, 0, additional_data.length);
- }
- else {
- out.writeInt(0);
- }
- }
- out.writeByte(getFlags());
- JGroupsVersion.writeOrdinal(out, this.version, true);
- }
-
- // GemStoneAddition - dataserializable
- public void fromData(DataInput in) throws IOException {
- int len=in.readUnsignedByte();
- if(len > 0) {
- byte[] a = new byte[len];
- in.readFully(a);
- this.ip_addr=InetAddress.getByAddress(a);
- }
- port=in.readUnsignedShort();
- processId = in.readInt(); // GemStoneAddition
- directPort = in.readInt();
- vmKind = in.readByte();
- birthViewId = in.readInt();
- name = in.readUTF();
- len=in.readInt();
- if(len > 0) {
- additional_data=new byte[len];
- in.readFully(additional_data, 0, additional_data.length);
- String roles[] = getRoles();
- int lastIndex = roles.length-1;
- int numValidRoles = lastIndex;
- if (roles.length > 0 && roles[lastIndex].startsWith(MEMBER_WEIGHT_PREFIX)) {
- String weightString = roles[lastIndex].substring(MEMBER_WEIGHT_PREFIX.length());
- memberWeight = Byte.parseByte(weightString);
- String[] newroles = new String[numValidRoles];
- System.arraycopy(roles, 0, newroles, 0, numValidRoles);
- setRoles(newroles);
- }
- }
- // GemStoneAddition - flags
- int flags = in.readUnsignedByte();
- setFlags(flags);
- // GemStoneAddition - version
- readVersion(flags, in);
- }
-
- // GemStoneAddition - for ack processing we don't need the whole
- // address
- byte[] cachedAddress;
- public void toDataShort(DataOutput out) throws IOException {
- byte[] address;
-
- if (cachedAddress != null)
- address = cachedAddress;
- else {
- address=ip_addr.getAddress();
- cachedAddress = address;
- }
- out.writeByte(address.length);
- out.write(address, 0, address.length);
- out.writeShort(port);
- }
-
- // GemStoneAddition - for ack processing
- public void fromDataShort(DataInput in) throws IOException {
- int len=in.readUnsignedByte();
- //read the four bytes
- byte[] a = new byte[len];
- //in theory readFully(byte[]) should be faster
- //than read(byte[]) since latter reads
- // 4 bytes one at a time
- in.readFully(a);
- //look up an instance in the cache
- this.ip_addr=InetAddress.getByAddress(a);
- //then read the port
- port=in.readUnsignedShort();
- }
-
- public int size(short version) {
- if(size >= 0)
- return size;
- // address length
- int tmp_size = Global.BYTE_SIZE;
- // address
- if(ip_addr != null)
- tmp_size+=ip_addr.getAddress().length; // 4 bytes for IPv4
- // port
- tmp_size += Global.SHORT_SIZE;
- // PID
- tmp_size += Global.INT_SIZE; // GemStoneAddition
- // direct-port
- tmp_size += Global.INT_SIZE; // GemStoneAddition
- // vm-kind
- tmp_size += Global.BYTE_SIZE;
- // view-id
- tmp_size += Global.INT_SIZE; // GemStoneAddition
- // additional data size
- tmp_size += Global.INT_SIZE;
- // additional data
- if(additional_data != null)
- tmp_size+=additional_data.length;
- // flags
- tmp_size += Global.BYTE_SIZE;
- // version
- tmp_size += (this.version < 256? 1 : 3);
- // GemStoneAddition - ignore durableClientAttributes in size calculations
- // since client IDs are never used in datagram size estimations
- setSize(tmp_size);
- return tmp_size;
- }
-
- @Override // GemStoneAddition
- public Object clone() {
- IpAddress ret=new IpAddress(ip_addr, port);
- ret.processId = this.processId; // GemStoneAddition
- ret.shouldNotBeCoordinator = this.shouldNotBeCoordinator; // GemStoneAddition
- ret.splitBrainEnabled = this.splitBrainEnabled; // GemStoneAddition
- ret.name = this.name; // GemStoneAddition
- ret.version = this.version; // GemStoneAddition
- ret.birthViewId = this.birthViewId; // GemStoneAddition
- if(additional_data != null) {
- ret.additional_data=new byte[additional_data.length];
- System.arraycopy(additional_data, 0, ret.additional_data, 0, additional_data.length);
- }
- return ret;
- }
-
- @Override
- public short[] getSerializationVersions() {
- return null;
- }
-
- public void setBirthViewId(int birthViewId) {
- this.birthViewId = birthViewId;
- }
-
- public int getSize() {
- return size;
- }
-
- public void setSize(int size) {
- this.size = size;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/LogicalAddress.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/LogicalAddress.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/LogicalAddress.java
deleted file mode 100644
index 862a523..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/LogicalAddress.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: LogicalAddress.java,v 1.9 2005/07/17 11:34:20 chrislott Exp $
-
-package com.gemstone.org.jgroups.stack;
-
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.io.*;
-import java.net.InetAddress;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-
-
-/**
- * Logical address that spans the lifetime of a member. Assigned at member (JVM) startup, and
- * retained until member is shutdown. Note that the address does <em>not</em> change on
- * disconnect-connect sequences. For example, when a member is shunned and subsequently
- * readmitted to the group, the member's address (LogicalAddress) remains the same.<br/>
- * An instance of LogicalAddress is generated by the transport protocol. Currently, only
- * UDP_NIO generates LogicalAddresses.<br/>
- * Note that host, timestamp and id are supposed to make LogicalAddress as unique as possible.
- * However, there is a remote chance that 2 instances started on the same machine create their
- * address at exactly the same time, resulting in identical addresses (leading to problems).
- * In the future, I will try to make this totally unique, by for example using the PID of the current
- * process (once available though the JDK, or by locking on a common resource (e.g. /dev/random)
- * to serialize creation. However, as for now, chances are you will never experience this problem.
- * @author Bela Ban, Dec 23 2003
- */
-public class LogicalAddress implements Address {
- static int count=1;
- protected String host=null;
- protected long timestamp=0;
- protected int id=0;
- protected boolean multicast_addr=false;
-
- // GemStoneAddition
- public boolean preferredForCoordinator() {
- return true;
- }
- public boolean splitBrainEnabled() {
- return false;
- }
-
- @Override
- public int getBirthViewId() {
- return -1;
- }
-
- @Override
- public short getVersionOrdinal() {
- return -1;
- }
-
- /** Address of the primary physical address. This is set to the sender when a message is received.
- * If this field is set, we will send unicast messages only to this address, not to all addresses listed
- * in physical_addrs; this reduces the number of msgs we have to send.<br/>
- * Note that this field is not shipped across the wire.
- */
- transient SocketAddress primary_physical_addr=null;
-
- /** List<SocketAddress> of physical addresses */
- protected ArrayList physical_addrs=null;
-
- /** To tack on some additional data */
- byte[] additional_data=null;
-
-
-
- // Used only by Externalization
- public LogicalAddress() {
- }
-
-
- /** Use this constructor to create an instance, not the null-constructor */
- public LogicalAddress(String host_name, List physical_addrs) {
- init(host_name, physical_addrs);
- }
-
-
-
- protected void init(String host_name, List physical_addrs) {
- if(host_name != null) {
- this.host=host_name;
- }
- else {
- try {
- host=InetAddress.getLocalHost().getHostName();
- }
- catch(Exception e) {
- host="localhost";
- }
- }
-
- timestamp=System.currentTimeMillis();
-
- synchronized(LogicalAddress.class) {
- id=count++;
- }
-
- if(physical_addrs != null) {
- this.physical_addrs=new ArrayList(physical_addrs);
- }
- }
-
-
- public String getHost() {
- return host;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public long getId() {
- return id;
- }
-
- public SocketAddress getPrimaryPhysicalAddress() {
- return primary_physical_addr;
- }
-
- public void setPrimaryPhysicalAddress(SocketAddress primary_physical_addr) {
- this.primary_physical_addr=primary_physical_addr;
- }
-
- /**
- * Returns a <em>copy</em> of the list of physical addresses. Reason for the copy is that the list is not supposed
- * to be modified (should be immutable).
- * @return List of physical addresses (return value maybe null)
- */
- public ArrayList getPhysicalAddresses() {
- return physical_addrs != null? (ArrayList)physical_addrs.clone() : null;
- }
-
- /**
- * For internal use only. Don't use this method!
- * @param addr
- */
- public void addPhysicalAddress(SocketAddress addr) {
- if(addr != null) {
- if(physical_addrs == null)
- physical_addrs=new ArrayList();
- if(!physical_addrs.contains(addr))
- physical_addrs.add(addr);
- }
- }
-
- /**
- * For internal use only. Don't use this method !
- * @param addr
- */
- public void removePhysicalAddress(SocketAddress addr) {
- if(addr != null && physical_addrs != null)
- physical_addrs.remove(addr);
- }
-
- /**
- * For internal use only. Don't use this method !
- */
- public void removeAllPhysicalAddresses() {
- if(physical_addrs != null)
- physical_addrs.clear();
- }
-
- public boolean isMulticastAddress() {
- return false; // LogicalAddresses can never be multicast
- }
-
- public int size(short version) {
- return 22;
- }
-
- /**
- * Returns the additional_data.
- * @return byte[]
- */
- public byte[] getAdditionalData() {
- return additional_data;
- }
-
- /**
- * Sets the additional_data.
- * @param additional_data The additional_data to set
- */
- public void setAdditionalData(byte[] additional_data) {
- this.additional_data = additional_data;
- }
-
-
- /**
- * Establishes an order between 2 addresses. Assumes other contains non-null IpAddress.
- * Excludes channel_name from comparison.
- * @return 0 for equality, value less than 0 if smaller, greater than 0 if greater.
- */
- public int compare(LogicalAddress other) {
- return compareTo(other);
- }
-
-
- /**
- * implements the java.lang.Comparable interface
- * @see Comparable
- * @param o - the Object to be compared
- * @return a negative integer, zero, or a positive integer as this object is less than,
- * equal to, or greater than the specified object.
- * @exception ClassCastException - if the specified object's type prevents it
- * from being compared to this Object.
- */
- public int compareTo(Object o) {
- int rc;
-
- if ((o == null) || !(o instanceof LogicalAddress))
- throw new ClassCastException("LogicalAddress.compareTo(): comparison between different classes");
- LogicalAddress other = (LogicalAddress) o;
-
- rc=this.host.compareTo(other.host);
- if(rc != 0) return rc;
- if(this.timestamp != other.timestamp)
- return this.timestamp < other.timestamp? -1 : 1;
- if(this.id != other.id)
- return this.id < other.id? -1 : 1;
- return 0;
- }
-
-
-
- @Override // GemStoneAddition
- public boolean equals(Object obj) {
- if(obj == null) return false;
- if (!(obj instanceof LogicalAddress)) return false; // GemStoneAddition
- return compareTo(obj) == 0 ? true : false;
- }
-
-
-
-
- @Override // GemStoneAddition
- public int hashCode() {
- int retval=(int)(host.hashCode() + timestamp + id);
- return retval;
- }
-
-
-
-
- @Override // GemStoneAddition
- public String toString() {
- return toString(false);
- }
-
-
- public String toString(boolean print_details) {
- StringBuffer sb=new StringBuffer();
-
- sb.append(host);
- sb.append(':').append(id);
- if(print_details) {
- sb.append(" (created ").append(new Date(timestamp)).append(')');
- if(physical_addrs != null)
- sb.append("\nphysical addrs: ").append(physical_addrs);
- }
- if(additional_data != null)
- sb.append(" (additional data: ").append(additional_data.length).append(" bytes)");
- return sb.toString();
- }
-
-
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(host);
- out.writeLong(timestamp);
- out.writeInt(id);
-
- if(physical_addrs != null) {
- out.writeInt(physical_addrs.size());
- out.writeObject(physical_addrs);
- }
- else
- out.writeInt(0);
-
- if(additional_data != null) {
- out.writeInt(additional_data.length);
- out.write(additional_data, 0, additional_data.length);
- }
- else
- out.writeInt(0);
- }
-
-
-
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- int len;
-
- host=(String)in.readObject();
- timestamp=in.readLong();
- id=in.readInt();
-
- len=in.readInt();
- if(len > 0) {
- physical_addrs=(ArrayList)in.readObject();
- }
-
- len=in.readInt();
- if(len > 0) {
- additional_data=new byte[len];
- in.readFully(additional_data, 0, additional_data.length);
- }
- }
-
-
-
- public void writeTo(DataOutputStream out) throws IOException {
- Util.writeString(host, out);
- out.writeLong(timestamp);
- out.writeInt(id);
- out.writeBoolean(multicast_addr);
- ObjectOutputStream oos=new ObjectOutputStream(out);
- oos.writeObject(physical_addrs);
- oos.close();
- Util.writeByteBuffer(additional_data, out);
- }
-
- public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
- host=Util.readString(in);
- timestamp=in.readLong();
- id=in.readInt();
- multicast_addr=in.readBoolean();
- ObjectInputStream ois=new ObjectInputStream(in);
- try {
- physical_addrs=(ArrayList)ois.readObject();
- }
- catch(ClassNotFoundException e) {
- }
- additional_data=Util.readByteBuffer(in);
- }
-
- @Override // GemStoneAddition
- public Object clone() throws CloneNotSupportedException {
- LogicalAddress ret=new LogicalAddress();
- ret.host=host;
- ret.timestamp=timestamp;
- ret.id=id;
- ret.multicast_addr=multicast_addr;
- ret.additional_data=additional_data;
- ret.primary_physical_addr=primary_physical_addr;
- if(physical_addrs != null)
- ret.physical_addrs=(ArrayList)physical_addrs.clone();
- return ret;
- }
-
- public LogicalAddress copy() {
- try {
- return (LogicalAddress)clone();
- }
- catch(CloneNotSupportedException e) {
- return null;
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/MessageProtocol.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/MessageProtocol.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/MessageProtocol.java
deleted file mode 100644
index 0435e44..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/stack/MessageProtocol.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: MessageProtocol.java,v 1.5 2005/11/12 06:37:41 belaban Exp $
-
-package com.gemstone.org.jgroups.stack;
-
-
-
-import com.gemstone.org.jgroups.*;
-import com.gemstone.org.jgroups.blocks.GroupRequest;
-import com.gemstone.org.jgroups.blocks.RequestCorrelator;
-import com.gemstone.org.jgroups.blocks.RequestHandler;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.Rsp;
-import com.gemstone.org.jgroups.util.RspList;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.util.Vector;
-
-
-
-
-/**
- * Based on Protocol, but incorporates RequestCorrelator and GroupRequest: the latter can
- * be used to mcast messages to all members and receive their reponses.<p>
- * A protocol based on this template can send messages to all members and receive all, a single,
- * n, or none responses. Requests directed towards the protocol can be handled by overriding
- * method <code>Handle</code>.<p>
- * Requests and responses are in the form of <code>Message</code>s, which would typically need to
- * contain information pertaining to the request/response, e.g. in the form of objects contained
- * in the message. To use remote method calls, use <code>RpcProtocol</code> instead.<p>
- * Typical use of of a <code>MessageProtocol</code> would be when a protocol needs to interact with
- * its peer protocols at each of the members' protocol stacks. A simple protocol like fragmentation,
- * which does not need to interact with other instances of fragmentation, may simply subclass
- * <code>Protocol</code> instead.
- * @author Bela Ban
- */
-public abstract class MessageProtocol extends Protocol implements RequestHandler {
- protected RequestCorrelator _corr=null;
- protected final Vector members=new Vector();
-
-
- @Override // GemStoneAddition
- public void start() throws Exception {
- if(_corr == null)
- _corr=new RequestCorrelator(getName(), this, this);
- _corr.start();
- }
-
- @Override // GemStoneAddition
- public void stop() {
- if(_corr != null) {
- _corr.stop();
- // _corr=null;
- }
- }
-
-
- /**
- Cast a message to all members, and wait for <code>mode</code> responses. The responses are
- returned in a response list, where each response is associated with its sender.<p>
- Uses <code>GroupRequest</code>.
- @param dests The members from which responses are expected. If it is null, replies from all members
- are expected. The request itself is multicast to all members.
- @param msg The message to be sent to n members
- @param mode Defined in <code>GroupRequest</code>. The number of responses to wait for:
- <ol>
- <li>GET_FIRST: return the first response received.
- <li>GET_ALL: wait for all responses (minus the ones from suspected members)
- <li>GET_MAJORITY: wait for a majority of all responses (relative to the grp size)
- <li>GET_ABS_MAJORITY: wait for majority (absolute, computed once)
- <li>GET_N: wait for n responses (may block if n > group size)
- <li>GET_NONE: wait for no responses, return immediately (non-blocking)
- </ol>
- @param timeout If 0: wait forever. Otherwise, wait for <code>mode</code> responses
- <em>or</em> timeout time.
- @return RspList A list of responses. Each response is an <code>Object</code> and associated
- to its sender.
- */
- public RspList castMessage(Vector dests, Message msg, int mode, long timeout) {
- GroupRequest _req=null;
- Vector real_dests=dests != null? (Vector)dests.clone() : (Vector)members.clone();
-
- // This marks message as sent by us ! (used in up()
- // msg.addHeader(new MsgProtHeader(getName())); ++ already done by RequestCorrelator
-
- _req=new GroupRequest(msg, _corr, real_dests, mode, timeout, 0);
- _req.execute();
-
- return _req.getResults();
- }
-
-
- /**
- Sends a message to a single member (destination = msg.dest) and returns the response.
- The message's destination must be non-zero !
- */
- public Object sendMessage(Message msg, int mode, long timeout) throws TimeoutException, SuspectedException {
- Vector mbrs=new Vector();
- RspList rsp_list=null;
- Object dest=msg.getDest();
- Rsp rsp;
- GroupRequest _req=null;
-
- if(dest == null) {
- System.out.println("MessageProtocol.sendMessage(): the message's destination is null ! " +
- "Cannot send message !");
- return null;
- }
-
-
- mbrs.addElement(dest); // dummy membership (of destination address)
-
-
- _req=new GroupRequest(msg, _corr, mbrs, mode, timeout, 0);
- _req.execute();
-
- if(mode == GroupRequest.GET_NONE)
- return null;
-
-
- rsp_list=_req.getResults();
-
- if(rsp_list.size() == 0) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.MessageProtocol_RESPONSE_LIST_IS_EMPTY);
- return null;
- }
- if(rsp_list.size() > 1)
- if(log.isErrorEnabled()) log.error("response list contains " +
- "more that 1 response; returning first response");
- rsp=(Rsp)rsp_list.elementAt(0);
- if(rsp.wasSuspected())
- throw new SuspectedException(dest);
- if(!rsp.wasReceived())
- throw new TimeoutException();
- return rsp.getValue();
- }
-
-
- /**
- Processes a request destined for this layer. The return value is sent as response.
- */
- public Object handle(Message req) {
- System.out.println("MessageProtocol.handle(): this method should be overridden !");
- return null;
- }
-
-
- /**
- * Handle an event coming from the layer above
- */
- @Override // GemStoneAddition
- public final void up(Event evt) {
- Message msg;
- Object hdr;
-
- switch(evt.getType()) {
- case Event.VIEW_CHANGE:
- updateView((View)evt.getArg());
- break;
- default:
- if(!handleUpEvent(evt)) return;
-
- if(evt.getType() == Event.MSG) {
- msg=(Message)evt.getArg();
- hdr=msg.getHeader(getName());
- if(!(hdr instanceof RequestCorrelator.Header))
- break;
- }
- // [[[ TODO
- // RequestCorrelator.receive() is currently calling passUp()
- // itself. Only _this_ method should call passUp()!
- // So return instead of breaking until fixed (igeorg)
- // ]]] TODO
- if(_corr != null) {
- _corr.receive(evt);
- return;
- }
- else
- if(log.isWarnEnabled()) log.warn("Request correlator is null, evt=" + Util.printEvent(evt));
-
- break;
- }
-
- passUp(evt);
- }
-
-
- /**
- * This message is not originated by this layer, therefore we can just
- * pass it down without having to go through the request correlator.
- * We do this ONLY for messages !
- */
- @Override // GemStoneAddition
- public final void down(Event evt) {
- switch(evt.getType()) {
- case Event.VIEW_CHANGE:
- updateView((View)evt.getArg());
- if(!handleDownEvent(evt)) return;
- break;
- case Event.MSG:
- if(!handleDownEvent(evt)) return;
- break;
- default:
- if(!handleDownEvent(evt)) return;
- break;
- }
-
- passDown(evt);
- }
-
-
- protected void updateView(View new_view) {
- Vector new_mbrs=new_view.getMembers();
- if(new_mbrs != null) {
- synchronized(members) {
- members.removeAllElements();
- members.addAll(new_mbrs);
- }
- }
- }
-
-
- /**
- Handle up event. Return false if it should not be passed up the stack.
- */
- protected boolean handleUpEvent(Event evt) {
- // override in subclasses
- return true;
- }
-
- /**
- Handle down event. Return false if it should not be passed down the stack.
- */
- protected boolean handleDownEvent(Event evt) {
- // override in subclasses
- return true;
- }
-
-
-}