You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/08/21 23:22:31 UTC
[07/51] [partial] incubator-geode git commit: GEODE-77 removing the
old jgroups subproject
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/PBCAST.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/PBCAST.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/PBCAST.java
deleted file mode 100644
index c1bf78c..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/PBCAST.java
+++ /dev/null
@@ -1,1043 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: PBCAST.java,v 1.15 2005/11/03 11:42:58 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols.pbcast;
-
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.View;
-import com.gemstone.org.jgroups.stack.NakReceiverWindow;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.List;
-import com.gemstone.org.jgroups.util.Queue;
-import com.gemstone.org.jgroups.util.QueueClosedException;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.util.*;
-
-
-/**
- * Implementation of probabilistic broadcast. Sends group messages via unreliable multicast. Gossips regularly to
- * a random subset of group members to retransmit missing messages. Gossiping is used both for bringing all
- * members to the same state (having received the same messages) and to garbage-collect messages seen by all members
- * (gc is piggybacked in gossip messages). See DESIGN for more details.
- * @author Bela Ban
- */
-public class PBCAST extends Protocol implements Runnable {
- boolean operational=false;
- long seqno=1; // seqno for messages. 1 for the first message
- long gossip_round=1; // identifies the gossip (together with sender)
- Address local_addr=null;
- final Hashtable digest=new Hashtable(); // stores all messages from members (key: member, val: NakReceiverWindow)
-
- // GemStoneAddition -- access synchronized on this
- Thread gossip_thread=null;
- GossipHandler gossip_handler=null; // removes gossips and other requests from queue and handles them
- final Queue gossip_queue=new Queue(); // (bounded) queue for incoming gossip requests
- int max_queue=100; // max elements in gossip_queue (bounded buffer)
- long gossip_interval=5000; // gossip every 5 seconds
- double subset=0.1; // send gossip messages to a subset consisting of 10% of the mbrship
- long desired_avg_gossip=30000; // receive a gossip every 30 secs on average
- final Vector members=new Vector();
- final List gossip_list=new List(); // list of gossips received, we periodically purge it (FIFO)
- int max_gossip_cache=100; // number of gossips to keep until gossip list is purged
- int gc_lag=30; // how many seqnos should we lag behind (see DESIGN)
- final Hashtable invalid_gossipers=new Hashtable(); // keys=Address, val=Integer (number of gossips from suspected mbrs)
- static/*GemStoneAddition*/ final int max_invalid_gossips=2; // max number of gossip from non-member before that member is shunned
- Vector seen_list=null;
- boolean shun=false; // whether invalid gossipers will be shunned or not
- boolean dynamic=true; // whether to use dynamic or static gosssip_interval (overrides gossip_interval)
- volatile /*GemStoneAddition*/ boolean skip_sleep=true;
- boolean mcast_gossip=true; // use multicast for gossips (subset will be ignored, send to all members)
-
-
- @Override // GemStoneAddition
- public String getName() {
- return "PBCAST";
- }
-
-
- @Override // GemStoneAddition
- public Vector providedUpServices() {
- Vector retval=new Vector();
- retval.addElement(Integer.valueOf(Event.GET_DIGEST));
- retval.addElement(Integer.valueOf(Event.SET_DIGEST));
- retval.addElement(Integer.valueOf(Event.GET_DIGEST_STATE));
- return retval;
- }
-
-
- @Override // GemStoneAddition
- public void stop() {
- stopGossipThread();
- stopGossipHandler();
- operational=false;
- }
-
-
- @Override // GemStoneAddition
- public void up(Event evt) {
- Message m;
- PbcastHeader hdr;
- Address sender=null;
-
- switch(evt.getType()) {
- case Event.MSG:
- m=(Message) evt.getArg();
- if(m.getDest() != null && !m.getDest().isMulticastAddress()) {
- if(!(m.getHeader(getName()) instanceof PbcastHeader))
- break; // unicast address: not null and not mcast, pass up unchanged
- }
-
- // discard all multicast messages until we become operational (transition from joiner to member)
- if(!operational) {
-
- if(log.isInfoEnabled()) log.info("event was discarded as I'm not yet operational. Event: " +
- Util.printEvent(evt));
- return; // don't pass up
- }
-
- if(m.getHeader(getName()) instanceof PbcastHeader)
- hdr=(PbcastHeader) m.removeHeader(getName());
- else {
- sender=m.getSrc();
-
- if(log.isErrorEnabled()) log.error("PbcastHeader expected, but received header of type " +
- m.getHeader(getName()).getClass().getName() + " from " + sender +
- ". Passing event up unchanged");
- break;
- }
-
- switch(hdr.type) {
- case PbcastHeader.MCAST_MSG: // messages are handled directly (high priority)
- handleUpMessage(m, hdr);
- return;
-
- // all other requests are put in the bounded gossip queue (discarded if full). this helps to ensure
- // that no 'gossip storms' will occur (overflowing the buffers and the network)
- case PbcastHeader.GOSSIP:
- case PbcastHeader.XMIT_REQ:
- case PbcastHeader.XMIT_RSP:
- case PbcastHeader.NOT_MEMBER:
- try {
- if(gossip_queue.size() >= max_queue) {
-
- if(warn) log.warn("gossip request " +
- PbcastHeader.type2String(hdr.type) + " discarded because " +
- "gossip_queue is full (number of elements=" + gossip_queue.size() + ')');
- return;
- }
- gossip_queue.add(new GossipEntry(hdr, m.getSrc(), m.getBuffer()));
- }
- catch(Exception ex) {
- if(warn) log.warn("exception adding request to gossip_queue, details=" + ex);
- }
- return;
-
- default:
- if(log.isErrorEnabled()) log.error(ExternalStrings.PBCAST_TYPE__0__OF_PBCASTHEADER_NOT_KNOWN_, hdr.type);
- return;
- }
-
- case Event.SET_LOCAL_ADDRESS:
- local_addr=(Address) evt.getArg();
- break; // pass up
- }
-
- passUp(evt); // pass up by default
- }
-
-
- @Override // GemStoneAddition
- public void down(Event evt) {
- PbcastHeader hdr;
- Message m, copy;
- View v;
- Vector mbrs;
- Address key;
- NakReceiverWindow win;
-
-
- switch(evt.getType()) {
-
- case Event.MSG:
- m=(Message) evt.getArg();
- if(m.getDest() != null && !m.getDest().isMulticastAddress()) {
- break; // unicast address: not null and not mcast, pass down unchanged
- }
- else { // multicast address
- hdr=new PbcastHeader(PbcastHeader.MCAST_MSG, seqno);
- m.putHeader(getName(), hdr);
-
- // put message in NakReceiverWindow (to be on the safe side if we don't receive it ...)
- synchronized(digest) {
- win=(NakReceiverWindow) digest.get(local_addr);
- if(win == null) {
- if(log.isInfoEnabled()) log.info("NakReceiverWindow for sender " + local_addr +
- " not found. Creating new NakReceiverWindow starting at seqno=" + seqno);
- win=new NakReceiverWindow(local_addr, seqno);
- digest.put(local_addr, win);
- }
- copy=m.copy();
- copy.setSrc(local_addr);
- win.add(seqno, copy);
- }
- seqno++;
- break;
- }
-
- case Event.SET_DIGEST:
- setDigest((Digest) evt.getArg());
- return; // don't pass down
-
- case Event.GET_DIGEST: // don't pass down
- passUp(new Event(Event.GET_DIGEST_OK, getDigest()));
- return;
-
- case Event.GET_DIGEST_STATE: // don't pass down
- passUp(new Event(Event.GET_DIGEST_STATE_OK, getDigest()));
- return;
-
- case Event.VIEW_CHANGE:
- v=(View) evt.getArg();
- if(v == null) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.PBCAST_VIEW_IS_NULL_);
- break;
- }
- mbrs=v.getMembers();
-
- // update internal membership list
- synchronized(members) {
- members.removeAllElements();
- for(int i=0; i < mbrs.size(); i++)
- members.addElement(mbrs.elementAt(i));
- }
-
- // delete all members in digest that are not in new membership list
- if(mbrs.size() > 0) {
- synchronized(digest) {
- for(Enumeration e=digest.keys(); e.hasMoreElements();) {
- key=(Address) e.nextElement();
- if(!mbrs.contains(key)) {
- win=(NakReceiverWindow) digest.get(key);
- win.reset();
- digest.remove(key);
- }
- }
- }
- }
-
- // add all members from new membership list that are not yet in digest
- for(int i=0; i < mbrs.size(); i++) {
- key=(Address) mbrs.elementAt(i);
- if(!digest.containsKey(key)) {
- digest.put(key, new NakReceiverWindow(key, 1));
- }
- }
-
- if(dynamic) {
- gossip_interval=computeGossipInterval(members.size(), desired_avg_gossip);
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.PBCAST_VIEW_CHANGE_GOSSIP_INTERVAL_0, gossip_interval);
- if(gossip_thread != null) {
- skip_sleep=true;
- gossip_thread.interrupt(); // wake up and sleep according to the new gossip_interval
- }
- }
-
- startGossipThread(); // will only be started if not yet running
- startGossipHandler();
- break;
-
- case Event.BECOME_SERVER:
- operational=true;
- break;
- }
-
- passDown(evt);
- }
-
-
- /** Gossip thread. Sends gossips containing a message digest every <code>gossip_interval</code> msecs */
- public void run() {
- for (;;) { // GemStoneAddition remove coding anti-pattern
- if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition
- if(dynamic) {
- gossip_interval=computeGossipInterval(members.size(), desired_avg_gossip);
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.PBCAST_GOSSIP_INTERVAL_0, gossip_interval);
- }
-
- try { // GemStoneAddition
- Util.sleep(gossip_interval);
- }
- catch (InterruptedException e) {
- break; // Exit loop and thread
- }
- if(skip_sleep)
- skip_sleep=false;
- else
- sendGossip();
- }
- }
-
-
- /** Setup the Protocol instance acording to the configuration string */
- @Override // GemStoneAddition
- public boolean setProperties(Properties props) {super.setProperties(props);
- String str;
-
- str=props.getProperty("dynamic");
- if(str != null) {
- dynamic=Boolean.valueOf(str).booleanValue();
- props.remove("dynamic");
- }
-
- str=props.getProperty("shun");
- if(str != null) {
- shun=Boolean.valueOf(str).booleanValue();
- props.remove("shun");
- }
-
- str=props.getProperty("gossip_interval");
- if(str != null) {
- gossip_interval=Long.parseLong(str);
- props.remove("gossip_interval");
- }
-
- str=props.getProperty("mcast_gossip");
- if(str != null) {
- mcast_gossip=Boolean.valueOf(str).booleanValue();
- props.remove("mcast_gossip");
- }
-
- str=props.getProperty("subset");
- if(str != null) {
- subset=Double.parseDouble(str);
- props.remove("subset");
- }
-
- str=props.getProperty("desired_avg_gossip");
- if(str != null) {
- desired_avg_gossip=Long.parseLong(str);
- props.remove("desired_avg_gossip");
- }
-
- str=props.getProperty("max_queue");
- if(str != null) {
- max_queue=Integer.parseInt(str);
- props.remove("max_queue");
- }
-
- str=props.getProperty("max_gossip_cache");
- if(str != null) {
- max_gossip_cache=Integer.parseInt(str);
- props.remove("max_gossip_cache");
- }
-
- str=props.getProperty("gc_lag");
- if(str != null) {
- gc_lag=Integer.parseInt(str);
- props.remove("gc_lag");
- }
-
- if(props.size() > 0) {
- log.error(ExternalStrings.PBCAST_PBCASTSETPROPERTIES_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
-
- return false;
- }
- return true;
- }
-
-
-
- /* --------------------------------- Private Methods --------------------------------------------- */
-
-
- /**
- Ensures that FIFO is observed for all messages for a certain member. The NakReceiverWindow corresponding
- to a certain sender is looked up in a hashtable. Then, the message is added to the NakReceiverWindow.
- As many messages as possible are then removed from the table and passed up.
- */
- void handleUpMessage(Message m, PbcastHeader hdr) {
- Address sender=m.getSrc();
- NakReceiverWindow win=null;
- Message tmpmsg;
- long tmp_seqno=hdr.seqno;
-
- if(sender == null) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.PBCAST_SENDER_IS_NULL);
- return;
- }
-
- synchronized(digest) {
- win=(NakReceiverWindow) digest.get(sender);
- if(win == null) {
- if(warn) log.warn("NakReceiverWindow for sender " + sender +
- " not found. Creating new NakReceiverWindow starting at seqno=" + tmp_seqno);
- win=new NakReceiverWindow(sender, tmp_seqno);
- digest.put(sender, win);
- }
-
- // *************************************
- // The header was removed before, so we add it again for the NakReceiverWindow. When there is a
- // retransmission request, the header will already be attached to the message (both message and
- // header are *copied* into delivered_msgs when a message is removed from NakReceiverWindow).
- // *************************************
- m.putHeader(getName(), hdr);
- win.add(tmp_seqno, m);
-
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.PBCAST_RECEIVER_WINDOW_FOR__0__IS__1, new Object[] {sender, win});
-
- // Try to remove as many message as possible and send them up the stack
- while((tmpmsg=win.remove()) != null) {
- tmpmsg.removeHeader(getName()); // need to remove header again, so upper protocols don't get confused
- passUp(new Event(Event.MSG, tmpmsg));
- }
-
- // Garbage collect messages if singleton member (because then we won't receive any gossips, triggering
- // garbage collection)
- if(members.size() == 1) {
- tmp_seqno=Math.max(tmp_seqno - gc_lag, 0);
- if(tmp_seqno <= 0) {
- }
- else {
- if(trace) log.trace("deleting messages < " + tmp_seqno + " from " + sender);
- win.stable(tmp_seqno);
- }
- }
- }
- }
-
-
- /**
- * Returns for each sender the 'highest seen' seqno from the digest. Highest seen means the
- * highest seqno without any gaps, e.g. if for a sender P the messages 2 3 4 6 7 were received,
- * then only 2, 3 and 4 can be delivered, so 4 is the highest seen. 6 and 7 cannot because there
- * 5 is missing. If there are no message, the highest seen seqno is -1.
- */
- Digest getDigest() {
- Digest ret=new Digest(digest.size());
- long highest_seqno, lowest_seqno;
- Address key;
- NakReceiverWindow win;
-
- for(Enumeration e=digest.keys(); e.hasMoreElements();) {
- key=(Address) e.nextElement();
- win=(NakReceiverWindow) digest.get(key);
- lowest_seqno=win.getLowestSeen();
- highest_seqno=win.getHighestSeen();
- ret.add(key, lowest_seqno, highest_seqno);
- }
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.PBCAST_DIGEST_IS__0, ret);
-
- return ret;
- }
-
-
- /**
- * Sets (or resets) the contents of the 'digest' table. Its current messages will be deleted and the
- * NakReceiverTables reset.
- */
- void setDigest(Digest d) {
- NakReceiverWindow win;
-
- long tmp_seqno=1;
-
- synchronized(digest) {
- for(Enumeration e=digest.elements(); e.hasMoreElements();) {
- win=(NakReceiverWindow) e.nextElement();
- win.reset();
- }
- digest.clear();
-
-
- Map.Entry entry;
- Address sender;
- com.gemstone.org.jgroups.protocols.pbcast.Digest.Entry val;
- for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {
- entry=(Map.Entry)it.next();
- sender=(Address)entry.getKey();
- if(sender == null) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.PBCAST_CANNOT_SET_ITEM_BECAUSE_SENDER_IS_NULL);
- continue;
- }
- val=(com.gemstone.org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();
- tmp_seqno=val.high_seqno;
- digest.put(sender, new NakReceiverWindow(sender, tmp_seqno + 1)); // next to expect, digest had *last* seen !
- }
- }
- }
-
-
- String printDigest() {
- long highest_seqno;
- Address key;
- NakReceiverWindow win;
- StringBuffer sb=new StringBuffer();
-
- for(Enumeration e=digest.keys(); e.hasMoreElements();) {
- key=(Address) e.nextElement();
- win=(NakReceiverWindow) digest.get(key);
- highest_seqno=win.getHighestSeen();
- sb.append(key + ": " + highest_seqno + '\n');
- }
- return sb.toString();
- }
-
-
- String printIncomingMessageQueue() {
- StringBuffer sb=new StringBuffer();
- NakReceiverWindow win;
-
- win=(NakReceiverWindow) digest.get(local_addr);
- sb.append(win);
- return sb.toString();
- }
-
-
- synchronized /* GemStoneAddition */ void startGossipThread() {
- if(gossip_thread == null) {
- gossip_thread=new Thread(this);
- gossip_thread.setDaemon(true);
- gossip_thread.start();
- }
- }
-
-
- synchronized /* GemStoneAddition */ void stopGossipThread() {
- Thread tmp;
-
- if(gossip_thread != null) {
- if(gossip_thread.isAlive()) {
- tmp=gossip_thread;
- gossip_thread=null;
- tmp.interrupt();
- tmp=null;
- }
- }
-// gossip_thread=null; GemStoneAddition
- }
-
-
- void startGossipHandler() {
- if(gossip_handler == null) {
- gossip_handler=new GossipHandler(gossip_queue);
- gossip_handler.start();
- }
- }
-
- void stopGossipHandler() {
- if(gossip_handler != null) {
- gossip_handler.stop();
- gossip_handler=null;
- }
- }
-
-
- /**
- * Send a gossip message with a message digest of the highest seqnos seen per sender to a subset
- * of the current membership. Exclude self (I receive all mcasts sent by myself).
- */
- void sendGossip() {
- Vector current_mbrs=(Vector) members.clone();
- Vector subset_mbrs=null;
- Gossip gossip=null;
- Message msg;
- Address dest;
- PbcastHeader hdr;
-
-
- if(local_addr != null)
- current_mbrs.remove(local_addr); // don't pick myself
-
- if(mcast_gossip) { // send gossip to all members using a multicast
- gossip=new Gossip(local_addr, gossip_round, getDigest().copy(), null); // not_seen list is null, prevents forwarding
- for(int i=0; i < current_mbrs.size(); i++) // all members have seen this gossip. Used for garbage collection
- gossip.addToSeenList((Address) current_mbrs.elementAt(i));
- hdr=new PbcastHeader(gossip, PbcastHeader.GOSSIP);
- msg=new Message(); // null dest == multicast to all members
- msg.putHeader(getName(), hdr);
-
-
- if(log.isInfoEnabled()) log.info("(from " + local_addr +
- ") multicasting gossip " + gossip.shortForm() + " to all members");
-
- passDown(new Event(Event.MSG, msg));
- }
- else {
- subset_mbrs=Util.pickSubset(current_mbrs, subset);
-
- for(int i=0; i < subset_mbrs.size(); i++) {
- gossip=new Gossip(local_addr, gossip_round, getDigest().copy(), (Vector) current_mbrs.clone());
- gossip.addToSeenList(local_addr);
- hdr=new PbcastHeader(gossip, PbcastHeader.GOSSIP);
- dest=(Address) subset_mbrs.elementAt(i);
- msg=new Message(dest, null, null);
- msg.putHeader(getName(), hdr);
-
-
- if(log.isInfoEnabled()) log.info("(from " + local_addr +
- ") sending gossip " + gossip.shortForm() + " to " + subset_mbrs);
-
- passDown(new Event(Event.MSG, msg));
- }
- }
-
- gossip_round++;
- }
-
-
- /**
- * MOST IMPORTANT METHOD IN THIS CLASS !! This guy really decides how a gossip reaches all members,
- * or whether it will flood the network !<p>
- * Scrutinize the gossip received and request retransmission of messages that we haven't received yet.
- * A gossip has a digest which carries for each sender the lowest and highest seqno seen. We check
- * this range against our own digest and request retransmission of missing messages if needed.<br>
- * <em>See DESIGN for a description of this method</em>
- */
- void handleGossip(Gossip gossip) {
- long my_low=0, my_high=0, their_low, their_high;
- Hashtable ht=null;
- Digest their_digest;
- NakReceiverWindow win;
- Message msg;
- Address dest;
- Vector new_dests;
- PbcastHeader hdr;
- List missing_msgs; // list of missing messages (for retransmission) (List of Longs)
-
- if(trace)
- log.trace("(from " + local_addr + ") received gossip " + gossip.shortForm() + " from " + gossip.sender);
-
- if(gossip == null || gossip.digest == null) {
- if(warn) log.warn("gossip is null or digest is null");
- return;
- }
-
- /* 1. If gossip sender is null, we cannot ask it for missing messages anyway, so discard gossip ! */
- if(gossip.sender == null) {
- if(log.isErrorEnabled()) log.error("sender of gossip is null; " +
- "don't know where to send XMIT_REQ to. Discarding gossip");
- return;
- }
-
- /* 2. Don't process the gossip if the sender of the gossip is not a member anymore. If it is a newly
- joined member, discard it as well (we can't tell the difference). When the new member will be
- added to the membership, then its gossips will be processed */
- if(!members.contains(gossip.sender)) {
- if(warn) log.warn("sender " + gossip.sender +
- " is not a member. Gossip will not be processed");
- if(shun)
- shunInvalidGossiper(gossip.sender);
- return;
- }
-
-
- /* 3. If this gossip was received before, just discard it and return (don't process the
- same gossip twice). This prevents flooding of the gossip sender with retransmission reqs */
- while(gossip_list.size() >= max_gossip_cache) // first delete oldest gossips
- gossip_list.removeFromHead();
-
- if(gossip_list.contains(gossip)) // already received, don't re-broadcast
- return;
- else
- gossip_list.add(gossip.copy()); // add to list of received gossips
-
-
-
- /* 4. Send a HEARD_FROM event containing all members in the gossip-chain down to the FD layer.
- This ensures that we don't suspect them */
- seen_list=gossip.getSeenList();
- if(seen_list.size() > 0)
- passDown(new Event(Event.HEARD_FROM, seen_list.clone()));
-
-
-
- /* 5. Compare their digest against ours. Find out if some messages in the their digest are
- not in our digest. If yes, put them in the 'ht' hashtable for retransmission */
- their_digest=gossip.digest;
-
- Map.Entry entry;
- Address sender;
- com.gemstone.org.jgroups.protocols.pbcast.Digest.Entry val;
- for(Iterator it=their_digest.senders.entrySet().iterator(); it.hasNext();) {
- entry=(Map.Entry)it.next();
- sender=(Address)entry.getKey();
- val=(com.gemstone.org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();
- their_low=val.low_seqno;
- their_high=val.high_seqno;
- if(their_low == 0 && their_high == 0)
- continue; // won't have any messages for this sender, don't even re-send
-
- win=(NakReceiverWindow) digest.get(sender);
- if(win == null) {
- // this specific sender in this digest is probably not a member anymore, new digests
- // won't contain it. for now, just ignore it. if it is a new member, it will be in the next
- // gossips
-
- if(warn) log.warn("sender " + sender + " not found, skipping...");
- continue;
- }
-
- my_low=win.getLowestSeen();
- my_high=win.getHighestSeen();
- if(my_high < their_high) {
- // changed by Bela (June 26 2003) - replaced my_high with my_low (not tested though !)
- if(my_low + 1 < their_low) {
- }
- else {
- missing_msgs=win.getMissingMessages(my_high, their_high);
- if(missing_msgs != null) {
- if(log.isInfoEnabled())
- log.info("asking " + gossip.sender + " for retransmission of " +
- sender + ", missing messages: " + missing_msgs + "\nwin for " + sender + ":\n" + win + '\n');
- if(ht == null) ht=new Hashtable();
- ht.put(sender, missing_msgs);
- }
- }
- }
- }
-
-
-
- /* 6. Send a XMIT_REQ to the sender of the gossip. The sender will then resend those messages as
- an XMIT_RSP unicast message (the messages are in its buffer, as a List) */
- if(ht == null || ht.size() == 0) {
- }
- else {
- hdr=new PbcastHeader(PbcastHeader.XMIT_REQ);
- hdr.xmit_reqs=ht;
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.PBCAST_SENDING_XMIT_REQ_TO__0, gossip.sender);
- msg=new Message(gossip.sender, null, null);
- msg.putHeader(getName(), hdr);
- passDown(new Event(Event.MSG, msg));
- }
-
-
-
- /* 7. Remove myself from 'not_seen' list. If not_seen list is empty, we can garbage-collect messages
- smaller than the digest. Since all the members have seen the gossip, it will not be re-sent */
- gossip.removeFromNotSeenList(local_addr);
- if(gossip.sizeOfNotSeenList() == 0) {
- garbageCollect(gossip.digest);
- return;
- }
-
-
-
- /* 8. If we make it to this point, re-send to subset of remaining members in 'not_seen' list */
- new_dests=Util.pickSubset(gossip.getNotSeenList(), subset);
-
-
- if(log.isInfoEnabled()) log.info("(from " + local_addr +
- ") forwarding gossip " + gossip.shortForm() + " to " + new_dests);
- gossip.addToSeenList(local_addr);
- for(int i=0; i < new_dests.size(); i++) {
- dest=(Address) new_dests.elementAt(i);
- msg=new Message(dest, null, null);
- hdr=new PbcastHeader(gossip.copy(), PbcastHeader.GOSSIP);
- msg.putHeader(getName(), hdr);
- passDown(new Event(Event.MSG, msg));
- }
- }
-
-
- /**
- * Find the messages indicated in <code>xmit_reqs</code> and re-send them to
- * <code>requester</code>
- */
- void handleXmitRequest(Address requester, Hashtable xmit_reqs) {
- NakReceiverWindow win;
- Address sender;
- List msgs, missing_msgs, xmit_msgs;
- Message msg;
-
- if(requester == null) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.PBCAST_REQUESTER_IS_NULL);
- return;
- }
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.PBCAST_RETRANSMISSION_REQUESTS_ARE__0, printXmitReqs(xmit_reqs));
- for(Enumeration e=xmit_reqs.keys(); e.hasMoreElements();) {
- sender=(Address) e.nextElement();
- win=(NakReceiverWindow) digest.get(sender);
- if(win == null) {
- if(warn) log.warn("sender " + sender +
- " not found in my digest; skipping retransmit request !");
- continue;
- }
-
- missing_msgs=(List) xmit_reqs.get(sender);
- msgs=win.getMessagesInList(missing_msgs); // msgs to be sent back to requester
-
-
-
- // re-send the messages to requester. don't add a header since they already have headers
- // (when added to the NakReceiverWindow, the headers were not removed)
- xmit_msgs=new List();
- for(Enumeration en=msgs.elements(); en.hasMoreElements();) {
- msg=((Message) en.nextElement()).copy();
- xmit_msgs.add(msg);
- }
-
- // create a msg with the List of xmit_msgs as contents, add header
- msg=new Message(requester, null, xmit_msgs);
- msg.putHeader(getName(), new PbcastHeader(PbcastHeader.XMIT_RSP));
- passDown(new Event(Event.MSG, msg));
- }
- }
-
-
- void handleXmitRsp(List xmit_msgs) {
- Message m;
- PbcastHeader hdr;
-
- for(Enumeration e=xmit_msgs.elements(); e.hasMoreElements();) {
- m=(Message) e.nextElement();
- hdr=(PbcastHeader) m.removeHeader(getName());
- if(hdr == null) {
- log.warn("header is null, ignoring message");
- }
- else {
- if(log.isInfoEnabled()) log.info("received #" + hdr.seqno + ", type=" +
- PbcastHeader.type2String(hdr.type) + ", msg=" + m);
- handleUpMessage(m, hdr);
- }
- }
- }
-
-
- String printXmitReqs(Hashtable xmit_reqs) {
- StringBuffer sb=new StringBuffer();
- Address key;
- boolean first=true;
-
- if(xmit_reqs == null)
- return "<null>";
-
- for(Enumeration e=xmit_reqs.keys(); e.hasMoreElements();) {
- key=(Address) e.nextElement();
- if(!first) {
- sb.append(", ");
- }
- else
- first=false;
- sb.append(key + ": " + xmit_reqs.get(key));
- }
- return sb.toString();
- }
-
-
- void garbageCollect(Digest d) {
- Address sender;
- long tmp_seqno;
- NakReceiverWindow win;
- Map.Entry entry;
- com.gemstone.org.jgroups.protocols.pbcast.Digest.Entry val;
-
- for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {
- entry=(Map.Entry)it.next();
- sender=(Address)entry.getKey();
- val=(com.gemstone.org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();
- win=(NakReceiverWindow)digest.get(sender);
- if(win == null) {
- if(log.isDebugEnabled()) log.debug("sender " + sender +
- " not found in our message digest, skipping");
- continue;
- }
- tmp_seqno=val.high_seqno;
- tmp_seqno=Math.max(tmp_seqno - gc_lag, 0);
- if(tmp_seqno <= 0) {
- continue;
- }
-
- if(trace) log.trace("(from " + local_addr +
- ") GC: deleting messages < " + tmp_seqno + " from " + sender);
- win.stable(tmp_seqno);
- }
- }
-
-
- /**
- * If sender of gossip is not a member, send a NOT_MEMBER to sender (after n gossips received).
- * This will cause that member to leave the group and possibly re-join.
- */
- void shunInvalidGossiper(Address invalid_gossiper) {
- int num_pings=0;
- Message shun_msg;
-
- if(invalid_gossipers.containsKey(invalid_gossiper)) {
- num_pings=((Integer) invalid_gossipers.get(invalid_gossiper)).intValue();
- if(num_pings >= max_invalid_gossips) {
-
- if(log.isInfoEnabled()) log.info("sender " + invalid_gossiper +
- " is not member of " + members + " ! Telling it to leave group");
- shun_msg=new Message(invalid_gossiper, null, null);
- shun_msg.putHeader(getName(), new PbcastHeader(PbcastHeader.NOT_MEMBER));
- passDown(new Event(Event.MSG, shun_msg));
- invalid_gossipers.remove(invalid_gossiper);
- }
- else {
- num_pings++;
- invalid_gossipers.put(invalid_gossiper, Integer.valueOf(num_pings));
- }
- }
- else {
- num_pings++;
- invalid_gossipers.put(invalid_gossiper, Integer.valueOf(num_pings));
- }
- }
-
-
- /** Computes the gossip_interval. See DESIGN for details */
- long computeGossipInterval(int num_mbrs, double desired_avg_gossip) {
- return getRandom((long) (num_mbrs * desired_avg_gossip * 2));
- }
-
-
- long getRandom(long range) {
- return (long) ((Math.random() * range) % range);
- }
-
-
- /* ------------------------------- End of Private Methods ---------------------------------------- */
-
-
- private static class GossipEntry {
- PbcastHeader hdr=null;
- Address sender=null;
- byte[] data=null;
-
- GossipEntry(PbcastHeader hdr, Address sender, byte[] data) {
- this.hdr=hdr;
- this.sender=sender;
- this.data=data;
- }
-
- @Override // GemStoneAddition
- public String toString() {
- return "hdr=" + hdr + ", sender=" + sender + ", data=" +
-// data
- (data == null ? "null" : "(" + data.length + " bytes)") // GemStoneAddition
- ;
- }
- }
-
-
- /**
- Handles gossip and retransmission requests. Removes requests from a (bounded) queue.
- */
- private class GossipHandler implements Runnable {
- Thread t=null;
- final Queue queue;
-
-
- GossipHandler(Queue q) {
- queue=q;
- }
-
-
- synchronized /* GemStoneAddition */ void start() {
- if(t == null) {
- t=new Thread(this, "PBCAST.GossipHandlerThread");
- t.setDaemon(true);
- t.start();
- }
- }
-
-
- synchronized /* GemStoneAddition */ void stop() {
- Thread tmp;
- if(t != null && t.isAlive()) {
- tmp=t;
- t=null;
- if(queue != null)
- queue.close(false); // don't flush elements
- tmp.interrupt();
- }
- t=null;
- }
-
-
- public void run() {
- GossipEntry entry;
- PbcastHeader hdr;
- List xmit_msgs;
- byte[] data;
-
- for (;;) { // GemStoneAddition -- remove anti-pattern
- if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition
- try {
- entry=(GossipEntry) queue.remove();
- hdr=entry.hdr;
- if(hdr == null) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.PBCAST_GOSSIP_ENTRY_HAS_NO_PBCASTHEADER);
- continue;
- }
-
- switch(hdr.type) {
-
- case PbcastHeader.GOSSIP:
- handleGossip(hdr.gossip);
- break;
-
- case PbcastHeader.XMIT_REQ:
- if(hdr.xmit_reqs == null) {
- if(warn) log.warn("request is null !");
- break;
- }
- handleXmitRequest(entry.sender, hdr.xmit_reqs);
- break;
-
- case PbcastHeader.XMIT_RSP:
- data=entry.data;
- if(data == null) {
- if(warn) log.warn("buffer is null (no xmitted msgs)");
- break;
- }
- try {
- xmit_msgs=(List) Util.objectFromByteBuffer(data);
- }
- catch(Exception ex) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.PBCAST_FAILED_CREATING_RETRANSMITTED_MESSAGES_FROM_BUFFER, ex);
- break;
- }
- handleXmitRsp(xmit_msgs);
- break;
-
- case PbcastHeader.NOT_MEMBER: // we are shunned
- if(shun) {
- if(log.isInfoEnabled()) log.info(ExternalStrings.PBCAST_I_AM_BEING_SHUNNED_WILL_LEAVE_AND_REJOIN);
- passUp(new Event(Event.EXIT));
- }
- break;
-
- default:
- if(log.isErrorEnabled()) log.error("type (" + hdr.type +
- ") of PbcastHeader not known !");
- return;
- }
- }
- catch (InterruptedException ie) { // GemStoneAddition
- // no need to reset interrupt; we're exiting
- break;
- }
- catch(QueueClosedException closed) {
- break;
- }
- }
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/ParticipantGmsImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/ParticipantGmsImpl.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/ParticipantGmsImpl.java
deleted file mode 100644
index 919f6e1..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/ParticipantGmsImpl.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: ParticipantGmsImpl.java,v 1.17 2005/12/23 14:57:06 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols.pbcast;
-
-
-import com.gemstone.org.jgroups.*;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.Promise;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.Vector;
-
-
-public class ParticipantGmsImpl extends GmsImpl {
- private final Vector suspected_mbrs=new Vector(11);
- private final Set<Address> departed_mbrs = new HashSet<Address>(); // GemStoneAddition
- private final Promise leave_promise=new Promise();
-
-
- public ParticipantGmsImpl(GMS g) {
- super(g);
- }
-
-
- @Override // GemStoneAddition
- public void init() throws Exception {
- super.init();
- synchronized(suspected_mbrs) { // GemStoneAddition
- suspected_mbrs.removeAllElements();
- }
- leave_promise.reset();
- }
-
- @Override // GemStoneAddition
- public boolean join(Address mbr) { // GemStoneAddition - return boolean
- wrongMethod("join");
- return false;
- }
-
-
- @Override
- public void handleJoinsAndLeaves(List joins, List leaves, List suspects, List suspectReasons, boolean forceInclusion) {
- for (Iterator it=suspects.iterator(); it.hasNext(); ) {
- handleSuspect((Address)it.next());
- }
- if (leaves != null && !leaves.isEmpty()) {
- handleLeave(leaves, false, null, false);
- }
- }
-
-
- /**
- * This is used when we are shutting down to inform others
- * Loop: determine coord. If coord is me --> handleLeave().
- * Else send handleLeave() to coord until success
- */
- @Override // GemStoneAddition
- public void leave(Address mbr) {
- Address coord;
- int max_tries=3;
- Object result;
-
- leave_promise.reset();
-
- if(mbr.equals(gms.local_addr))
- leaving=true;
-
- while ((coord=gms.determineCoordinator()) != null && max_tries-- > 0) {
- if(gms.local_addr.equals(coord)) { // I'm the coordinator
- if (leaving) { // don't know who to tell that we're shutting down - bale out
- break; // GemStoneAddition - bug #42969 hang during shutdown
- }
- gms.becomeCoordinator(this.suspected_mbrs);
- // gms.getImpl().handleLeave(mbr, false); // regular leave
- gms.getImpl().leave(mbr); // regular leave
- return;
- }
-
- if(log.isDebugEnabled()) log.debug("sending LEAVE request to " + coord + " (local_addr=" + gms.local_addr + ")");
- sendLeaveMessage(coord, mbr);
-
- // GemStoneAddition - if I'm the coordinator
- // don't wait for a response
-
- // GemStoneAddition - scale up the leave_timeout if there are a lot of
- // members
- long leaveTimeout = gms.leave_timeout * gms.members.size() / 10;
- synchronized(leave_promise) {
- result=leave_promise.getResult(leaveTimeout);
- if(result != null)
- break;
- }
-
- // GemStoneAddition - we used to only break out of the loop
- // if wouldIBeCoordinator returned true, but we've already
- // sent a ShutdownMessage in GemFire so the other members know
- // that this process shut down normally, so no need to wait
- // for a new view if the coordinator is probably gone
-
- synchronized(this.suspected_mbrs) {
- if (this.suspected_mbrs.contains(coord)) {
- break;
- }
- }
-
- synchronized(this.departed_mbrs) {
- if (this.departed_mbrs.contains(coord)) {
- break;
- }
- }
-
- // GemStoneAddition - also just quit if the coordinator hasn't
- // changed after LEAVE has already been sent (and probably retransmitted)
- // to it
- if (gms.determineCoordinator() == coord) {
- break;
- }
-
- }
- gms.becomeClient();
- }
-
- public Vector getSuspects() { // GemStoneAddition
- synchronized(this.suspected_mbrs) {
- return new Vector(this.suspected_mbrs);
- }
- }
-
-
- @Override // GemStoneAddition
- public void handleJoinResponse(JoinRsp join_rsp) {
- // wrongMethod("handleJoinResponse");
- }
-
- @Override // GemStoneAddition
- public void handleLeaveResponse(String reason) {
- if (reason != null && reason.length() > 0) {
- gms.passUp(new Event(Event.EXIT,
- gms.stack.gfBasicFunctions.getForcedDisconnectException(
- "This member has been forced out of the distributed system. Reason='"
- + reason + "'")));
- }
- synchronized(leave_promise) {
- leave_promise.setResult(Boolean.TRUE); // unblocks thread waiting in leave()
- }
- }
-
-
- public void suspect(Address mbr) {
- handleSuspect(mbr);
- }
-
-
- /** Removes previously suspected member from list of currently suspected members */
- @Override // GemStoneAddition
- public void unsuspect(Address mbr) {
- synchronized(suspected_mbrs) { // GemStoneAddition
- if(mbr != null)
- suspected_mbrs.remove(mbr);
- }
- }
-
-
- @Override // GemStoneAddition
- public void handleJoin(Address mbr) {
- }
-
-// @Override // GemStoneAddition
-// public void handleJoin(List mbrList) { // GemStoneAddition
-// }
-
-
- public void handleLeave(Address mbr, boolean suspected) {
- if (suspected) { // GemStoneAddition
- handleSuspect(mbr);
- } else {
- handleLeave(Collections.singletonList(mbr), false, null, false);
- }
- }
-
- // GemStoneAddition - list of mbrs and 'reason'
- @Override // GemStoneAddition
- public void handleLeave(List members, boolean suspected, List reasons, boolean forceInclusion) {
- if (suspected) { // GemStoneAddition
- for (Iterator it=members.iterator(); it.hasNext() && (gms.getImpl() == this); ) {
- handleSuspect((Address)it.next());
- }
- } else {
- boolean becomeCoordinator = false;
- Vector suspects = null;
- Set departed = null;
- synchronized(this.suspected_mbrs) {
- suspects = new Vector(suspected_mbrs);
- }
- synchronized(this.departed_mbrs) {
- this.departed_mbrs.addAll(members);
- departed = new HashSet<Address>(this.departed_mbrs);
- }
- if (wouldIBeCoordinator(suspects, departed)) {
- if(log.isDebugEnabled()) log.debug("suspected mbrs=" + suspected_mbrs + "; departed="+this.departed_mbrs
- + "; members are " +
- gms.members + ", coord=" + gms.local_addr + ": I'm the new coord !");
-
- becomeCoordinator = true;
- }
- if (becomeCoordinator) {
- synchronized(this.suspected_mbrs) {
- suspected_mbrs.removeAll(suspects);
- }
- synchronized(this.departed_mbrs) {
- this.departed_mbrs.removeAll(departed);
- }
- gms.incrementLtime(10);
- gms.becomeCoordinator(suspects);
- }
- }
- }
-
-
- /**
- * If we are leaving, we have to wait for the view change (last msg in the current view) that
- * excludes us before we can leave.
- * @param new_view The view to be installed
- * @param digest If view is a MergeView, digest contains the seqno digest of all members and has to
- * be set by GMS
- */
- @Override // GemStoneAddition
- public void handleViewChange(View new_view, Digest digest) {
- Vector mbrs=new_view.getMembers();
- if(log.isDebugEnabled()) log.debug("view=" + new_view);
- synchronized(suspected_mbrs) { // GemStoneAddition
- suspected_mbrs.removeAllElements();
- }
- synchronized(this.departed_mbrs) {
- this.departed_mbrs.clear();
- }
- if(!mbrs.contains(gms.local_addr)) {
- if (leaving) {
- // received a view in which I'm not member: ignore
- return;
- }
- else {
- // GemStoneChange - if not leaving and we get a LEAVE_RSP, it means
- // this process has been ousted as a suspect
- gms.passUp(new Event(Event.EXIT,
- gms.stack.gfBasicFunctions.getForcedDisconnectException(
- ExternalStrings.
- PGMS_THIS_MEMBER_HAS_BEEN_FORCED_OUT_OF_THE_DISTRIBUTED_SYSTEM_PLEASE_CONSULT_GEMFIRE_LOGS_TO_FIND_THE_REASON_PGMS
- .toLocalizedString(new_view.getCreator()))));
- }
- }
- gms.installView(new_view, digest);
- }
-
-
- public void handleSuspect(Address mbr) {
- boolean becomeCoordinator = false; // GemStoneAddition
- Vector suspects = null;
- if (mbr == null) return;
-
- synchronized (suspected_mbrs) { // GemStoneAddition - bug 35063
- // Both contains() and addElement() are synchronized, but
- // there is a window between the two, so we must synchronize.
- if(!suspected_mbrs.contains(mbr))
- suspected_mbrs.addElement(mbr);
-
- if(log.getLogWriter().fineEnabled()) log.getLogWriter().fine(
- "PGMS: suspected mbr=" + mbr + ", suspected_mbrs=" + suspected_mbrs + ", members=" +
- gms.members + ", local_addr=" + gms.local_addr);
-
- if(wouldIBeCoordinator(this.suspected_mbrs, Collections.EMPTY_SET)) {
- if(log.isDebugEnabled()) log.debug("suspected mbr=" + mbr + "), departed="+this.departed_mbrs
- + "; members are " +
- gms.members + ", coord=" + gms.local_addr + ": I'm the new coord !");
-
- becomeCoordinator = true;
- suspects = new Vector(suspected_mbrs);
- suspected_mbrs.removeAllElements();
- }
- } // synchronized
-
- if (becomeCoordinator) {
- synchronized(this.departed_mbrs) {
- this.departed_mbrs.clear();
- }
- gms.incrementLtime(10);
- gms.becomeCoordinator(suspects);
- // gms.getImpl().suspect(mbr);
- }
- }
-
- @Override // GemStoneAddition
- public void handleMergeRequest(Address sender, ViewId merge_id) {
- // only coords handle this method; reject it if we're not coord
- sendMergeRejectedResponse(sender, merge_id);
- }
-
- /* ---------------------------------- Private Methods --------------------------------------- */
-
- /**
- * Determines whether this member is the new coordinator given a list of suspected members. This is
- * computed as follows: the list of currently suspected members (suspected_mbrs) is removed from the current
- * membership. If the first member of the resulting list is equals to the local_addr, then it is true,
- * otherwise false. Example: own address is B, current membership is {A, B, C, D}, suspected members are {A,
- * D}. The resulting list is {B, C}. The first member of {B, C} is B, which is equal to the
- * local_addr. Therefore, true is returned.
- * @param suspects members that have crashed
- * @param departures members that have departed normally
- */
- boolean wouldIBeCoordinator(Vector suspects, Set<Address> departures) {
- Address new_coord;
- Vector mbrs=gms.members.getMembers(); // getMembers() returns a *copy* of the membership vector
-
- if (log.isDebugEnabled()) {
- log.debug("wouldIBeCoordinator:\nmembers = " + mbrs + "\ndeparted = " + this.departed_mbrs
- + "\nsuspected = " + this.suspected_mbrs);
- }
-
- if (suspects != null) {
- mbrs.removeAll(suspects);
- }
-
- if (departures != null) {
- mbrs.removeAll(departures);
- }
-
- if(mbrs.size() < 1) return false;
- // GemStoneAddition - revised for split-brain detection
- new_coord = new Membership(mbrs).getCoordinator();
-// log.getLogWriterI18n().info(JGroupsStrings.DEBUG, "pgms: of members " + mbrs + " the coordinator would be " + new_coord);
- if (new_coord == null) { // oops - no eligable coordinators
- return false;
- }
- return gms.local_addr.equals(new_coord);
- }
-
-
- void sendLeaveMessage(Address coord, Address mbr) {
- Message msg=new Message(coord, null, null);
- GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.LEAVE_REQ, mbr);
-
- msg.putHeader(gms.getName(), hdr);
- gms.passDown(new Event(Event.MSG, msg));
- }
-
-
-// @Override // GemStoneAddition
-// public void suspect(Address mbr, String reason) {
-// suspect(mbr);
-// }
-
-
- /* (non-Javadoc) GemStoneAddition
- * @see com.gemstone.org.jgroups.protocols.pbcast.GmsImpl#handleAlreadyJoined(com.gemstone.org.jgroups.Address)
- */
- @Override // GemStoneAddition
- public void handleAlreadyJoined(Address mbr) {
- }
-
-
- /* ------------------------------ End of Private Methods ------------------------------------ */
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/PbcastHeader.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/PbcastHeader.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/PbcastHeader.java
deleted file mode 100644
index 4f578ee..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/PbcastHeader.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: PbcastHeader.java,v 1.3 2004/07/05 05:49:41 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols.pbcast;
-
-import com.gemstone.org.jgroups.Header;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Hashtable;
-
-
-
-
-public class PbcastHeader extends Header {
- public static final int MCAST_MSG = 0; // regular multicast message
- public static final int GOSSIP = 1; // gossip message (unicast)
- public static final int XMIT_REQ = 2; // retransmit request (unicast)
- public static final int XMIT_RSP = 3; // retransmit response (unicast)
- public static final int NOT_MEMBER = 4; // shun message (unicast)
-
-
-
- int type=-1;
- long seqno=-1; // we start out with 0 as first seqno for an mcast message
- Gossip gossip=null; // used to identify gossips, implements the equals() and hashCode() methods
- Hashtable xmit_reqs=null; // for XMIT_REQs. keys=sender, vals=List of Longs (list of missing msgs)
-
-
-
- public PbcastHeader() {
- type=-1;
- }
-
-
- public PbcastHeader(int type) {
- this.type=type;
- }
-
-
- public PbcastHeader(int type, long seqno) {
- this.type=type; this.seqno=seqno;
- }
-
-
- public PbcastHeader(Gossip g, int type) {
- this.type=type; gossip=g;
- }
-
-
- public PbcastHeader(Gossip g, int type, long seqno) {
- this.type=type; this.seqno=seqno;
- gossip=g;
- }
-
-
-
- public long getSeqno() {return seqno;}
-
-
- @Override // GemStoneAddition
- public String toString() {
- StringBuffer sb=new StringBuffer();
- sb.append("[PBCAST(" + type2String(type) + "), seqno=" + seqno);
- if(gossip != null) sb.append(", gossip=" + gossip);
- sb.append(']');
- return sb.toString();
- }
-
-
- @Override // GemStoneAddition
- public long size(short version) {
- return 500;
- }
-
-
- public static String type2String(int t) {
- switch(t) {
- case MCAST_MSG: return "MCAST_MSG";
- case GOSSIP: return "GOSSIP";
- case XMIT_REQ: return "XMIT_REQ";
- case XMIT_RSP: return "XMIT_RSP";
- case NOT_MEMBER: return "NOT_MEMBER";
- default: return "UNKNOWN";
- }
- }
-
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(type);
- out.writeLong(seqno);
- out.writeObject(gossip);
- out.writeObject(xmit_reqs);
- }
-
-
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- type=in.readInt();
- seqno=in.readLong();
- gossip=(Gossip)in.readObject();
- xmit_reqs=(Hashtable)in.readObject();
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/README
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/README b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/README
deleted file mode 100644
index 96e9a43..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/README
+++ /dev/null
@@ -1,140 +0,0 @@
-
-
-
- Probabilistic Broadcast for JavaGroups
- ======================================
-
-
-
-JavaGroups currently uses virtual synchrony (VS) in its main protocol
-suite. VS is suited for tightly coupled, lockstep replication. Typical
-examples are clusters, replicated databases etc. Group size is 100
-max, and it is targeted to LANs rather than WANs.
-
-The problem with VS is that is has to enforce that all members have
-received all messages in a view before proceeding to the next
-view. This is done by a FLUSH protocol, which ensures (by
-retransmission) that each member has seen all messages in the current
-view. During the FLUSH protocol, all members are essentially
-blocked. Messages can be sent, but they will be sent only when the
-FLUSH protocol has terminated (in one of the subsequent view, not in
-the current one). The FLUSH protocol itself may need to be restarted,
-e.g. in the case when a participating member fails during the FLUSH.
-
-When one node (or a link) in a VS group is slow, it will bring the
-performance of the entire group down, as members proceed at the pace
-of the slowest members (at least during membership
-changes). (Otherwise, the likely result is just growing buffers and
-retransmissions, as messages waiting to be delivered are buffered).
-
-The bimodel multicast (or probabilistic broadcast) protocols (PBCAST)
-developed at Cornell try to solve this problem by providing
-probabilistic reliability guarantees rather than hard ones. In a
-nutshell, the probability of a very small number of members receiving
-a message is high and the probability of all members receiving it is
-high as well. The probability of some members receiving a message is
-very small, because the 'epidemic' nature of PBCAST infects the group
-exponentially, making sure every member receives a message, or none.
-
-PBCAST protocols therefore scale very well, both in terms of group
-member size as well as over WANs with intermittent link/node
-failures. By implementing a PBCAST protocol, JavaGroups can now be
-used in WAN settings. However, there are no hard reliability
-guarantees anymore, just probabilitic ones. Yes there are a number of
-applications, which don't need hard reliability, and can live with
-probabilistic guarantees, for example replicated naming services and
-publish-subscribe applications. In these settings, eventual
-convergence of replicated state and low-cost of the protocol is more
-important than lock-step replication.
-
-The JavaGroups API will not be changed at all. However, applications
-with a protocol stack configured to use PBCAST have to be aware that
-views are only an approximation of the membership, not a hard
-guarantee.
-
-The PBCAST protocol is located in the ./pbcast subdirectory of
-./Protocols. The major changes are:
-
-
-GMS
----
-Unlike VS, the JavaGroups implementation of PBCAST does not per se
-guarantee that the set of messages delivered in a view V is the same
-at all members. Therefore, applications cannot rely on the fact that
-when they send a message in view V, it will be received by all current
-non-faulty members in V.
-
-Views are delivered at each receiver at a certain position in the
-incoming message stream. However, as PBCAST only provides FIFO (which
-guarantees that messages from sender P are seen in the order sent by
-P), it is possible that messages sent by senders P and Q in view V1
-can be received in different views at each receiver. However, it is
-possible to add total order by implementing a TOTAL protocol and
-adding it on top of a given protocol stack. This would then
-essentially provide VS.
-
-Consider the following example: P send messages m1 and m2 in view V1
-(consisting of P, Q and R). While it sends the messages, a new member
-S joins the group. Since there is no FLUSH protocol that ensures that
-m1 and m2 are delivered in V1, the following could happen: m1 is
-delivered to Q and R in V1. Message m2 is delivered to Q, but is lost
-to R (e.g. dropped by a lossy link). Now, the new view V2 is installed
-by Q (which is the coordinator). Now, m2 is retransmitted by P to
-R. Clearly, VS would drop m2 because it was sent in a previous
-view. However, PBCAST faces two choices: either accept the message and
-deliver it or drop it as well. If we accept it, the FIFO properties
-for P are upheld, if we drop it, the next message m3 from P will not
-be delivered until m2 was seen by R. (Message IDs are not reset to 0
-because we have no total order over views beeing delivered at each
-member at the same location in the message stream, as shown
-above). Therefore, we have to accept the message.
-
-This leads to the conclusion that views are not used as a demarcation
-between message sets, but rather as indication that the group
-membership has changed. Therefore, protocols in the PBCAST suite will
-only use views to update their internal membership list, but never
-make the assumption that all members will see the view change at the
-same logical location in their message streams.
-
-
-FLUSH
------
-Not used anymore, as we're not flushing messages when proceeding to
-the next view.
-
-
-NAKACK
-------
-Not used anymore. Functionality will be covered by PBCAST. NAKACK made
-assumptions about views and messages and can therefore not be used.
-
-
-VIEW_ENFORCER
--------------
-Not used anymore. Messages sent in one view can be delivered in
-another one, although this usually doesn't happen. But we cannot make
-any assumptions about it.
-
-
-STATE_TRANSFER
---------------
-Not used anymore. New protocol for state transfer, especially geared
-towards big states (transfer in multiple transfers). However,
-STATE_TRANSFER could still be used (a TOTAL protocol has to be
-present).
-
-
-QUEUE
------
-May be used by the new state transfer protocol
-
-
-STABLE
-------
-Not used anymore. Functionality will be covered by PBCAST protocol.
-
-
-
-Refs
-----
-[1] http://www.cs.cornell.edu/Info/Projects/Spinglass/index.html