You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/08/21 23:23:12 UTC

[48/51] [partial] incubator-geode git commit: GEODE-77 removing the old jgroups subproject

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/View.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/View.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/View.java
deleted file mode 100644
index 5a8eada..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/View.java
+++ /dev/null
@@ -1,528 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-// $Id: View.java,v 1.10 2005/08/08 09:48:06 belaban Exp $
-
-package com.gemstone.org.jgroups;
-
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.Vector;
-
-import com.gemstone.org.jgroups.protocols.pbcast.Digest;
-import com.gemstone.org.jgroups.stack.IpAddress;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.Streamable;
-import com.gemstone.org.jgroups.util.StreamableFixedID;
-import com.gemstone.org.jgroups.util.Util;
-import com.gemstone.org.jgroups.util.VersionedStreamable;
-
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-
-/**
- * A view is a local representation of the current membership of a group.
- * Only one view is installed in a channel at a time.
- * Views contain the address of its creator, an ID and a list of member addresses.
- * These adresses are ordered, and the first address is always the coordinator of the view.
- * This way, each member of the group knows who the new coordinator will be if the current one
- * crashes or leaves the group.
- * The views are sent between members using the VIEW_CHANGE event.
- */
-public class View implements Externalizable, Cloneable, StreamableFixedID {
-  
-  /** GemStoneAddition - added for size check on views that carry;
-   * credentials
-   */
-//  public static int MAX_VIEW_SIZE = 60000;
-  
-    /* A view is uniquely identified by its ViewID
-     * The view id contains the creator address and a Lamport time.
-     * The Lamport time is the highest timestamp seen or sent from a view.
-     * if a view change comes in with a lower Lamport time, the event is discarded.
-     */
-    protected ViewId vid;
-
-    /**
-     * A list containing all the members of the view
-     * This list is always ordered, with the coordinator being the first member.
-     * the second member will be the new coordinator if the current one disappears
-     * or leaves the group.
-     */
-    protected Vector members;
-
-    /**
-     * GemStoneAddition -- any additional payload to be sent
-     * Used by the security AUTH layer to add/verify credentials.
-     */
-    private Object additionalData;
-    
-    /**
-     * GemStoneAddition - size of serialized form of additionalData
-     */
-    private int additionalDataSize;
-
-    /**
-     * GemStoneAddition - Members removed from previous view as Suspects
-     */
-    private Set suspectedMembers;
-    
-    /**
-     * GemStoneAddition - message digest moved from GmsHeader for FRAG3
-     * fragmentation
-     */
-    private Digest messageDigest;
-    
-    /**
-     * creates an empty view, should not be used
-     */
-    public View() {
-    }
-
-
-    /**
-     * Creates a new view
-     *
-     * @param vid     The view id of this view (can not be null)
-     * @param members Contains a list of all the members in the view, can be empty but not null.
-     */
-    public View(ViewId vid, Vector members) {
-        this.vid=vid;
-        this.members=members;
-    }
-
-
-    /**
-     * Creates a new view
-     *
-     * @param creator The creator of this view (can not be null)
-     * @param id      The lamport timestamp of this view
-     * @param members Contains a list of all the members in the view, can be empty but not null.
-     */
-    public View(Address creator, long id, Vector members) {
-      this(new ViewId(creator, id), members);
-    }
-
-    /**
-     * Creates a new view
-     *
-     * @param creator The creator of this view (can not be null)
-     * @param id      The lamport timestamp of this view
-     * @param members Contains a list of all the members in the view, can be empty but not null.
-     * @param suspectedMembers GemStoneAddition - tracking of ousted members
-     */
-    public View(Address creator, long id, Vector members, Vector suspectedMembers) {
-        this(new ViewId(creator, id), members);
-        if (suspectedMembers != null) {
-          this.suspectedMembers = new HashSet(suspectedMembers);
-        }
-    }
-
-
-    /**
-     * returns the view ID of this view
-     * if this view was created with the empty constructur, null will be returned
-     *
-     * @return the view ID of this view
-     */
-    public ViewId getVid() {
-        return vid;
-    }
-
-    /**
-     * returns the creator of this view
-     * if this view was created with the empty constructur, null will be returned
-     *
-     * @return the creator of this view in form of an Address object
-     */
-    public Address getCreator() {
-        return vid != null ? vid.getCoordAddress() : null;
-    }
-    
-    /**
-     * returns the coordinator of this view, which may not be the creator if it is not in the view.
-     *  GemStoneAddition
-     */
-    public Address getCoordinator() {
-
-      if (this.members.size() < 1) {
-        return null;
-      }
-      
-      return new Membership(this.members).getCoordinator();
-    }
-
-    /**
-     * Returns a reference to the List of members (ordered)
-     * Do NOT change this list, hence your will invalidate the view
-     * Make a copy if you have to modify it.
-     *
-     * @return a reference to the ordered list of members in this view
-     */
-    public Vector getMembers() {
-        return members;
-    }
-    
-    /**
-     * GemStoneAddition
-     * get the members that were removed from the previous view due to
-     * Suspect processing
-     */
-    public Set getSuspectedMembers() {
-      return this.suspectedMembers == null? Collections.EMPTY_SET : this.suspectedMembers;
-    }
-
-    /**
-     * GemStoneAddition -- getter for additional data
-     */
-    public Object getAdditionalData() {
-      return this.additionalData;
-    }
-
-    /**
-     * GemStoneAddition -- setter for additional data
-     */
-    public void setAdditionalData(Object data) {
-      this.additionalData = data;
-      // until credential fragmentation is implemented, we must perform
-      // a size check to make sure the view fits into a datagram
-      try {
-        ByteArrayOutputStream bas = new ByteArrayOutputStream(10000);
-        ObjectOutputStream oos = new ObjectOutputStream(bas);
-        oos.writeObject(data);
-        this.additionalDataSize = bas.size();
-//        if (serializedSize() > MAX_VIEW_SIZE) {
-//          this.additionalData = null;
-//          this.additionalDataSize = 0;
-//          throw new IllegalArgumentException(
-//            JGroupsStrings.View_SERIALIZED_VIEW_SIZE_0_EXCEEDS_MAXIMUM_OF_1
-//              .toLocalizedString(new Object[] { Integer.valueOf(bas.size()), Integer.valueOf(MAX_VIEW_SIZE) }));
-//        }
-      }
-      catch (IOException e) {
-        // ignore - this will happen again when the view is serialized
-        // for transmission
-      }
-    }
-
-    /**
-     * returns true, if this view contains a certain member
-     *
-     * @param mbr - the address of the member,
-     * @return true if this view contains the member, false if it doesn't
-     *         if the argument mbr is null, this operation returns false
-     */
-    public boolean containsMember(Address mbr) {
-        if(mbr == null || members == null) {
-            return false;
-        }
-        return members.contains(mbr);
-    }
-    
-    /**
-     * GemStoneAddition - removes the given address from the set of suspected members
-     * @param mbr
-     */
-    public void notSuspect(Address mbr) {
-      this.suspectedMembers.remove(mbr);
-    }
-    
-    /**
-     * GemStoneAddition - retrieve the address in the view that corresponds
-     * to the given address.  If the address is not in the view, return
-     * the argument.
-     */
-    public Address getMember(Address addr) {
-      int sz = members.size();
-      // reverse search to find the newest member that matches, in case
-      // of membership race condition that includes an old ID and a new ID
-      // for a member that's joining
-      for (int i=sz-1; i>0; i--) {
-        Address mbr = (Address)members.get(i);
-        if (mbr.equals(addr)) {
-          return mbr;
-        }
-      }
-      return addr;
-    }
-
-
-    @Override // GemStoneAddition
-    public boolean equals(Object obj) {
-        if(obj == null)
-            return false;
-        if(vid != null) {
-          if (!(obj instanceof View)) return false; // GemStoneAddition
-            int rc=vid.compareTo(((View)obj).vid);
-            if(rc != 0)
-                return false;
-            if(members != null && ((View)obj).members != null) {
-                return members.equals(((View)obj).members);
-            }
-        }
-        return false;
-    }
-
-    /*
-     * (non-Javadoc)
-     * @see java.lang.Object#hashCode()
-     * 
-     * Note that we just need to make sure that equal objects return equal
-     * hashcodes; nothing really elaborate is done here.
-     */
-    @Override // GemStoneAddition
-    public int hashCode() { // GemStoneAddition
-      int result = 0;
-      if (vid != null) {
-        result += vid.hashCode();
-        if (members != null) {
-          result += members.hashCode();
-        }
-      }
-      return result;
-    }
-    
-    /**
-     * returns the number of members in this view
-     *
-     * @return the number of members in this view 0..n
-     */
-    public int size() {
-        return members == null ? 0 : members.size();
-    }
-
-
-    /**
-     * creates a copy of this view
-     *
-     * @return a copy of this view
-     */
-    @Override // GemStoneAddition
-    @SuppressFBWarnings(value="CN_IDIOM_NO_SUPER_CALL")
-    public Object clone() {
-        ViewId vid2=vid != null ? (ViewId)vid.clone() : null;
-        Vector members2=members != null ? (Vector)members.clone() : null;
-        View result = new View(vid2, members2);
-        if (this.suspectedMembers != null) {
-          result.suspectedMembers = new HashSet(this.suspectedMembers);
-        }
-        if (additionalData != null) {
-          result.additionalData = additionalData;
-          result.additionalDataSize = additionalDataSize;
-        }
-        return result;
-    }
-
-
-    /**
-     * debug only
-     */
-    public String printDetails() {
-        StringBuffer ret=new StringBuffer();
-        ret.append(vid).append("\n\t");
-        if(members != null) {
-            for(int i=0; i < members.size(); i++) {
-                ret.append(members.elementAt(i)).append("\n\t");
-            }
-            ret.append('\n');
-        }
-        return ret.toString();
-    }
-
-
-    // GemStoneAddition - get the member name for an address
-    private String memberName(Address m) {
-      if (!(m instanceof IpAddress))
-        return m.toString();
-      IpAddress im = (IpAddress)m;
-      StringBuffer sb = new StringBuffer();
-
-      sb.append(im.toString());
-      int port = im.getDirectPort(); 
-      if (port > 0) {
-        sb.append('/');
-        sb.append(port);
-      }
-      return sb.toString();
-    }
-
-
-    @Override // GemStoneAddition
-    public String toString() {
-        StringBuffer ret=new StringBuffer(64);
-        // GemStoneAddition - give more info than jgroups defaults to giving
-        ret.append(vid);
-        ret.append(" [");
-        for (Iterator iter = members.iterator(); iter.hasNext(); ) {
-          Address member = (Address) iter.next();
-          // GemStoneAddition
-          ret.append(memberName(member));
-          
-          if (iter.hasNext()) {
-            ret.append(", ");
-          }
-        }
-        // GemStoneAddition
-        if (this.additionalData != null) {
-          ret.append(this.additionalData);
-        }
-        ret.append("]");
-        if (this.suspectedMembers != null && this.suspectedMembers.size() > 0) {
-          ret.append(" crashed mbrs: [");
-          for (Iterator it=this.suspectedMembers.iterator(); it.hasNext(); ) {
-            ret.append(memberName((Address)it.next()));
-            if (it.hasNext()) {
-              ret.append(", ");
-            }
-          }
-          ret.append(']');
-        }
-        //ret.append(vid).append(" ").append(members);
-        return ret.toString();
-    }
-    //public String toString() {
-    //    StringBuffer ret=new StringBuffer(64);
-    //    ret.append(vid).append(" ").append(members);
-    //    return ret.toString();
-    //}
-    
-    public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeObject(vid);
-        out.writeObject(members);
-        out.writeObject(this.suspectedMembers); // GemStoneAddition
-        out.writeInt(this.additionalDataSize);
-        out.writeObject(this.additionalData); // GemStoneAddition
-    }
-
-
-    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        vid=(ViewId)in.readObject();
-        members=(Vector)in.readObject();
-        this.suspectedMembers = (Set)in.readObject(); // GemStoneAddition
-        this.additionalDataSize = in.readInt();
-        this.additionalData = in.readObject(); // GemStoneAddition
-    }
-    
-    /** GemStoneAddition - find the lead member in this view */
-    public Address getLeadMember() {
-      for (int i=0; i<members.size(); i++) {
-        Address mbr = (Address)members.get(i);
-        if (((IpAddress)mbr).getVmKind() == 10) {
-          return mbr;
-        }
-      }
-      return null;
-    }
-
-
-    public void writeTo(DataOutputStream out) throws IOException {
-      JChannel.getGfFunctions().invokeToData(this, out);
-    }
-    
-    public void toData(DataOutput out) throws IOException {
-        // vid
-        if(vid != null) {
-            out.writeBoolean(true);
-            JChannel.getGfFunctions().invokeToData(vid, out);
-        }
-        else
-            out.writeBoolean(false);
-
-        // members:
-        JChannel.getGfFunctions().writeObject(members, out);
-        // GemStoneAddition - suspectedMembers
-        if (this.suspectedMembers == null) {
-          out.writeBoolean(false);
-        }
-        else {
-          out.writeBoolean(true);
-          JChannel.getGfFunctions().writeObject(this.suspectedMembers, out);
-        }
-        JChannel.getGfFunctions().writeObject(this.messageDigest, out);
-        // GemStoneAddition
-        out.writeInt(this.additionalDataSize);
-        JChannel.getGfFunctions().writeObject(this.additionalData, out);
-    }
-
-
-    public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
-      try {
-        JChannel.getGfFunctions().invokeFromData(this, in);
-      } catch (ClassNotFoundException ex) {
-        throw new IllegalAccessException(
-            ExternalStrings.View_COULD_NOT_READ_ADDITIONAL_DATA_0
-              .toLocalizedString(ex));
-      }
-
-    }
-    
-    public int getDSFID() {
-      return JGROUPS_VIEW;
-    }
-    
-    public short[] getSerializationVersions() {
-      return null;
-    }
-    
-    public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-        boolean b;
-        // vid:
-        b=in.readBoolean();
-        if(b) {
-            vid=new ViewId();
-            JChannel.getGfFunctions().invokeFromData(vid, in);
-        }
-
-        // members:
-        members=JChannel.getGfFunctions().readObject(in);
-        
-        // GemStoneAddition - suspectedMembers
-        if (in.readBoolean()) {
-          this.suspectedMembers = JChannel.getGfFunctions().readObject(in);
-        }
-        this.messageDigest = JChannel.getGfFunctions().readObject(in);
-        // GemStoneAddition
-        this.additionalDataSize = in.readInt();
-        this.additionalData = JChannel.getGfFunctions().readObject(in);
-    }
-
-    public int serializedSize(short version) {
-        int retval=Global.BYTE_SIZE; // presence for vid
-        if(vid != null)
-            retval+=vid.serializedSize(version);
-        retval+=Util.size(members,version);
-        // GemStoneAddition - suspectedMembers
-        if (this.suspectedMembers != null) {
-          retval+=Util.size(this.suspectedMembers, version);
-        }
-        if (this.additionalData != null) {
-          retval += this.additionalDataSize;
-        }
-        return retval;
-    }
-
-
-    public void setMessageDigest(Digest messageDigest) {
-      this.messageDigest = messageDigest;
-    }
-
-
-    public Digest getMessageDigest() {
-      return messageDigest;
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/ViewId.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/ViewId.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/ViewId.java
deleted file mode 100644
index 48ff4c8..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/ViewId.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-// $Id: ViewId.java,v 1.10 2005/07/12 11:45:42 belaban Exp $
-
-package com.gemstone.org.jgroups;
-
-import com.gemstone.org.jgroups.util.Streamable;
-import com.gemstone.org.jgroups.util.Util;
-import com.gemstone.org.jgroups.util.VersionedStreamable;
-
-import java.io.*;
-
-
-/**
- * ViewIds are used for ordering views (each view has a ViewId and a list of members).
- * Ordering between views is important for example in a virtual synchrony protocol where
- * all views seen by a member have to be ordered.
- */
-public class ViewId implements Externalizable, Comparable, Cloneable, VersionedStreamable {
-    Address coord_addr=null;   // Address of the issuer of this view
-    long id=0;                 // Lamport time of the view
-
-
-    public ViewId() { // used for externalization
-    }
-
-
-    /**
-     * Creates a ViewID with the coordinator address and a Lamport timestamp of 0.
-     *
-     * @param coord_addr the address of the member that issued this view
-     */
-    public ViewId(Address coord_addr) {
-        this.coord_addr=coord_addr;
-    }
-
-    /**
-     * Creates a ViewID with the coordinator address and the given Lamport timestamp.
-     *
-     * @param coord_addr - the address of the member that issued this view
-     * @param id         - the Lamport timestamp of the view
-     */
-    public ViewId(Address coord_addr, long id) {
-        this.coord_addr=coord_addr;
-        this.id=id;
-    }
-
-    /**
-     * returns the lamport time of the view
-     *
-     * @return the lamport time timestamp
-     */
-    public long getId() {
-        return id;
-    }
-
-
-    /**
-     * returns the address of the member that issued this view
-     *
-     * @return the Address of the the issuer
-     */
-    public Address getCoordAddress() {
-        return coord_addr;
-    }
-
-
-    @Override // GemStoneAddition
-    public String toString() {
-        return "[" + coord_addr + '|' + id + ']';
-    }
-
-    /**
-     * Cloneable interface
-     * Returns a new ViewID object containing the same address and lamport timestamp as this view
-     */
-    @Override // GemStoneAddition
-    public Object clone() {
-        return new ViewId(coord_addr, id);
-    }
-
-    /**
-     * Old Copy method, deprecated because it is substituted by clone()
-     */
-    public ViewId copy() {
-        return (ViewId)clone();
-    }
-
-    /**
-     * Establishes an order between 2 ViewIds. First compare on id. <em>Compare on coord_addr
-     * only if necessary</em> (i.e. ids are equal) !
-     *
-     * @return 0 for equality, value less than 0 if smaller, greater than 0 if greater.
-     */
-    public int compareTo(Object other) {
-        if(other == null) return 1; //+++ Maybe necessary to throw an exception
-
-        if(!(other instanceof ViewId)) {
-            throw new ClassCastException("ViewId.compareTo(): view id is not comparable with different Objects");
-        }
-        return id > ((ViewId)other).id ? 1 : id < ((ViewId)other).id ? -1 : 0;
-    }
-
-    /**
-     * Old Compare
-     */
-    public int compare(Object o) {
-        return compareTo(o);
-    }
-
-
-    @Override // GemStoneAddition
-    public boolean equals(Object other_view) {
-      if (other_view == null || !(other_view instanceof ViewId)) return false; // GemStoneAddition
-        return compareTo(other_view) == 0 ? true : false;
-    }
-
-
-    @Override // GemStoneAddition
-    public int hashCode() {
-        return (int)id;
-    }
-
-
-    public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeObject(coord_addr);
-        out.writeLong(id);
-    }
-
-
-    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        coord_addr=(Address)in.readObject();
-        id=in.readLong();
-    }
-
-    public void writeTo(DataOutputStream out) throws IOException {
-      JChannel.getGfFunctions().invokeToData(this, out);
-    }
-    
-    public void toData(DataOutput out) throws IOException {
-        JChannel.getGfFunctions().writeObject(coord_addr, out);
-        out.writeLong(id);
-    }
-
-    public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
-      try {
-        JChannel.getGfFunctions().invokeFromData(this, in);
-      } catch (Exception e) {
-        InstantiationException ex = new InstantiationException("problem deserializing ViewId");
-        ex.initCause(e);
-        throw ex;
-      }
-    }
-    
-    public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-        coord_addr=JChannel.getGfFunctions().readObject(in);
-        id=in.readLong();
-    }
-
-    public int serializedSize(short version) {
-        int retval=Global.LONG_SIZE; // for the id
-        retval+=Util.size(coord_addr,version);
-        return retval;
-    }
-
-
-    @Override
-    public short[] getSerializationVersions() {
-      return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/ConnectionTable.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/ConnectionTable.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/ConnectionTable.java
deleted file mode 100644
index 2b4e980..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/ConnectionTable.java
+++ /dev/null
@@ -1,1051 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-// $Id: ConnectionTable.java,v 1.39 2005/11/18 19:50:54 smarlownovell Exp $
-
-package com.gemstone.org.jgroups.blocks;
-
-import com.gemstone.org.jgroups.util.GemFireTracer;
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.JGroupsVersion;
-import com.gemstone.org.jgroups.stack.IpAddress;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.Queue;
-import com.gemstone.org.jgroups.util.QueueClosedException;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.io.*;
-import java.net.*;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Vector;
-
-
-/**
- * Manages incoming and outgoing TCP connections. For each outgoing message to destination P, if there
- * is not yet a connection for P, one will be created. Subsequent outgoing messages will use this
- * connection.  For incoming messages, one server socket is created at startup. For each new incoming
- * client connecting, a new thread from a thread pool is allocated and listens for incoming messages
- * until the socket is closed by the peer.<br>Sockets/threads with no activity will be killed
- * after some time.
- * <p>
- * Incoming messages from any of the sockets can be received by setting the message listener.
- * @author Bela Ban
- */
-public class ConnectionTable implements Runnable {
-    final HashMap       conns=new HashMap();       // keys: Addresses (peer address), values: Connection
-    Receiver            receiver=null;
-    ServerSocket        srv_sock=null;
-//    boolean             reuse_addr=false; GemStoneAddition
-    boolean             use_send_queues=true;
-    InetAddress         bind_addr=null;
-
-    /**
-     * The address which will be broadcast to the group (the externally visible address which this host should
-     * be contacted on). If external_addr is null, it will default to the same address that the server socket is bound to.
-     */
-    InetAddress		    external_addr=null;
-    Address             local_addr=null;             // bind_addr + port of srv_sock
-    int                 srv_port=7800;
-    int                 max_port=0;                   // maximum port to bind to (if < srv_port, no limit)
-    Thread              acceptor=null;               // continuously calls srv_sock.accept()
-    static final int    backlog=20;                  // 20 conn requests are queued by ServerSocket (addtl will be discarded)
-    int                 recv_buf_size=120000;
-    int                 send_buf_size=60000;
-    final Vector        conn_listeners=new Vector(); // listeners to be notified when a conn is established/torn down
-    final Object        recv_mutex=new Object();     // to serialize simultaneous access to receive() from multiple Connections
-    Reaper              reaper=null;                 // closes conns that have been idle for more than n secs
-    long                reaper_interval=60000;       // reap unused conns once a minute
-    long                conn_expire_time=300000;     // connections can be idle for 5 minutes before they are reaped
-    boolean             use_reaper=false;            // by default we don't reap idle conns
-    int                 sock_conn_timeout=1000;      // max time in millis to wait for Socket.connect() to return
-    ThreadGroup         thread_group=null;
-    protected final GemFireTracer log=GemFireTracer.getLog(getClass());
-    final static        byte[] NULL_DATA={};
-    final byte[]        cookie={'b', 'e', 'l', 'a'};
-
-
-
-    /** Used for message reception. */
-    public interface Receiver {
-        void receive(Address sender, byte[] data, int offset, int length);
-    }
-
-
-
-    /** Used to be notified about connection establishment and teardown. */
-    public interface ConnectionListener {
-        void connectionOpened(Address peer_addr);
-        void connectionClosed(Address peer_addr);
-    }
-
-
-    /**
-     * Regular ConnectionTable without expiration of idle connections
-     * @param srv_port The port on which the server will listen. If this port is reserved, the next
-     *                 free port will be taken (incrementing srv_port).
-     */
-    public ConnectionTable(int srv_port) throws Exception {
-        this.srv_port=srv_port;
-        start();
-    }
-
-
-    public ConnectionTable(InetAddress bind_addr, int srv_port) throws Exception {
-        this.srv_port=srv_port;
-        this.bind_addr=bind_addr;
-        start();
-    }
-
-
-    /**
-     * ConnectionTable including a connection reaper. Connections that have been idle for more than conn_expire_time
-     * milliseconds will be closed and removed from the connection table. On next access they will be re-created.
-     * @param srv_port The port on which the server will listen
-     * @param reaper_interval Number of milliseconds to wait for reaper between attepts to reap idle connections
-     * @param conn_expire_time Number of milliseconds a connection can be idle (no traffic sent or received until
-     *                         it will be reaped
-     */
-    public ConnectionTable(int srv_port, long reaper_interval, long conn_expire_time) throws Exception {
-        this.srv_port=srv_port;
-        this.reaper_interval=reaper_interval;
-        this.conn_expire_time=conn_expire_time;
-        use_reaper=true;
-        start();
-    }
-
-
-    /**
-     * Create a ConnectionTable
-     * @param r A reference to a receiver of all messages received by this class. Method <code>receive()</code>
-     *          will be called.
-     * @param bind_addr The host name or IP address of the interface to which the server socket will bind.
-     *                  This is interesting only in multi-homed systems. If bind_addr is null, the
-     *	  	        server socket will bind to the first available interface (e.g. /dev/hme0 on
-     *                  Solaris or /dev/eth0 on Linux systems).
-     * @param external_addr The address which will be broadcast to the group (the externally visible address
-     *                   which this host should be contacted on). If external_addr is null, it will default to
-     *                   the same address that the server socket is bound to.
-     * @param srv_port The port to which the server socket will bind to. If this port is reserved, the next
-     *                 free port will be taken (incrementing srv_port).
-     * @param max_port The largest port number that the server socket will be bound to. If max_port < srv_port
-     *                 then there is no limit.
-     */
-    public ConnectionTable(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port) throws Exception {
-        setReceiver(r);
-        this.bind_addr=bind_addr;
-	    this.external_addr=external_addr;
-        this.srv_port=srv_port;
-	    this.max_port=max_port;
-        start();
-    }
-
-
-    /**
-     * ConnectionTable including a connection reaper. Connections that have been idle for more than conn_expire_time
-     * milliseconds will be closed and removed from the connection table. On next access they will be re-created.
-     *
-     * @param r The Receiver 
-     * @param bind_addr The host name or IP address of the interface to which the server socket will bind.
-     *                  This is interesting only in multi-homed systems. If bind_addr is null, the
-     *		        server socket will bind to the first available interface (e.g. /dev/hme0 on
-     *		        Solaris or /dev/eth0 on Linux systems).
-     * @param external_addr The address which will be broadcast to the group (the externally visible address
-     *                   which this host should be contacted on). If external_addr is null, it will default to
-     *                   the same address that the server socket is bound to.
-     * @param srv_port The port to which the server socket will bind to. If this port is reserved, the next
-     *                 free port will be taken (incrementing srv_port).
-     * @param max_port The largest port number that the server socket will be bound to. If max_port < srv_port
-     *                 then there is no limit.
-     * @param reaper_interval Number of milliseconds to wait for reaper between attepts to reap idle connections
-     * @param conn_expire_time Number of milliseconds a connection can be idle (no traffic sent or received until
-     *                         it will be reaped
-     */
-    public ConnectionTable(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port,
-                           long reaper_interval, long conn_expire_time) throws Exception {
-        setReceiver(r);
-        this.bind_addr=bind_addr;
-        this.external_addr=external_addr;
-        this.srv_port=srv_port;
-        this.max_port=max_port;
-        this.reaper_interval=reaper_interval;
-        this.conn_expire_time=conn_expire_time;
-        use_reaper=true;
-        start();
-    }
-
-
-    public void setReceiver(Receiver r) {
-        receiver=r;
-    }
-
-
-    public void addConnectionListener(ConnectionListener l) {
-        if(l != null && !conn_listeners.contains(l))
-            conn_listeners.addElement(l);
-    }
-
-
-    public void removeConnectionListener(ConnectionListener l) {
-        if(l != null) conn_listeners.removeElement(l);
-    }
-
-
-    public Address getLocalAddress() {
-        if(local_addr == null)
-            local_addr=bind_addr != null ? new IpAddress(bind_addr, srv_port) : null;
-        return local_addr;
-    }
-
-
-    public int getSendBufferSize() {
-        return send_buf_size;
-    }
-
-    public void setSendBufferSize(int send_buf_size) {
-        this.send_buf_size=send_buf_size;
-    }
-
-    public int getReceiveBufferSize() {
-        return recv_buf_size;
-    }
-
-    public void setReceiveBufferSize(int recv_buf_size) {
-        this.recv_buf_size=recv_buf_size;
-    }
-
-    public int getSocketConnectionTimeout() {
-        return sock_conn_timeout;
-    }
-
-    public void setSocketConnectionTimeout(int sock_conn_timeout) {
-        this.sock_conn_timeout=sock_conn_timeout;
-    }
-
-    public int getNumConnections() {
-      synchronized (conns) { // GemStoneAddition
-        return conns.size();
-      }
-    }
-
-    public boolean getUseSendQueues() {return use_send_queues;}
-    public void setUseSendQueues(boolean flag) {this.use_send_queues=flag;}
-
-
-
-    public void send(Address dest, byte[] data, int offset, int length) throws Exception {
-        Connection conn;
-        if(dest == null) {
-            if(log.isErrorEnabled())
-                log.error(ExternalStrings.ConnectionTable_DESTINATION_IS_NULL);
-            return;
-        }
-
-        if(data == null) {
-            log.warn("data is null; discarding packet");
-            return;
-        }
-
-        // 1. Try to obtain correct Connection (or create one if not yet existent)
-        try {
-            conn=getConnection(dest);
-            if(conn == null) return;
-        }
-        catch(Throwable ex) {
-            throw new Exception("connection to " + dest + " could not be established", ex);
-        }
-
-        // 2. Send the message using that connection
-        try {
-            conn.send(data, offset, length);
-        }
-        catch(Throwable ex) {
-            ex.printStackTrace();
-            if(log.isTraceEnabled())
-                log.trace("sending msg to " + dest + " failed (" + ex.getClass().getName() + "); removing from connection table");
-            remove(dest);
-        }
-    }
-
-
-    /** Try to obtain correct Connection (or create one if not yet existent) */
-    Connection getConnection(Address dest) throws Exception {
-        Connection conn=null;
-        Socket sock;
-
-        synchronized(conns) {
-            conn=(Connection)conns.get(dest);
-            if(conn == null) {
-                // changed by bela Jan 18 2004: use the bind address for the client sockets as well
-                SocketAddress tmpBindAddr=new InetSocketAddress(bind_addr, 0);
-                InetAddress tmpDest=((IpAddress)dest).getIpAddress();
-                SocketAddress destAddr=new InetSocketAddress(tmpDest, ((IpAddress)dest).getPort());
-                sock=new Socket();
-                sock.bind(tmpBindAddr);
-                sock.connect(destAddr, sock_conn_timeout);
-
-                try {
-                    sock.setSendBufferSize(send_buf_size);
-                }
-                catch(IllegalArgumentException ex) {
-                    if(log.isErrorEnabled()) log.error("exception setting send buffer size to " +
-                            send_buf_size + " bytes", ex);
-                }
-                try {
-                    sock.setReceiveBufferSize(recv_buf_size);
-                }
-                catch(IllegalArgumentException ex) {
-                    if(log.isErrorEnabled()) log.error("exception setting receive buffer size to " +
-                            send_buf_size + " bytes", ex);
-                }
-                conn=new Connection(sock, dest);
-                conn.sendLocalAddress(local_addr);
-                notifyConnectionOpened(dest);
-                // conns.put(dest, conn);
-                addConnection(dest, conn);
-                conn.init();
-                if(log.isInfoEnabled()) log.info(ExternalStrings.ConnectionTable_CREATED_SOCKET_TO__0, dest);
-            }
-            return conn;
-        }
-    }
-
-
-    public void start() throws Exception {
-        init();
-        srv_sock=createServerSocket(srv_port, max_port);
-
-        if (external_addr!=null)
-            local_addr=new IpAddress(external_addr, srv_sock.getLocalPort());
-        else if (bind_addr != null)
-            local_addr=new IpAddress(bind_addr, srv_sock.getLocalPort());
-        else
-            local_addr=new IpAddress(srv_sock.getLocalPort());
-
-        if(log.isInfoEnabled()) log.info(ExternalStrings.ConnectionTable_SERVER_SOCKET_CREATED_ON__0, local_addr);
-
-        //Roland Kurmann 4/7/2003, build new thread group
-        thread_group = new ThreadGroup(Thread.currentThread().getThreadGroup(), "ConnectionTableGroup");
-        //Roland Kurmann 4/7/2003, put in thread_group
-        acceptor=new Thread(thread_group, this, "ConnectionTable.AcceptorThread");
-        acceptor.setDaemon(true);
-        acceptor.start();
-
-        // start the connection reaper - will periodically remove unused connections
-        if(use_reaper && reaper == null) {
-            reaper=new Reaper();
-            reaper.start();
-        }
-    }
-
-    protected void init() throws Exception {
-    }
-    
-    /** Closes all open sockets, the server socket and all threads waiting for incoming messages */
-    public void stop() {
-        Iterator it=null;
-        Connection conn;
-        ServerSocket tmp;
-
-        // 1. close the server socket (this also stops the acceptor thread)
-        if(srv_sock != null) {
-            try {
-                tmp=srv_sock;
-                srv_sock=null;
-                tmp.close();
-            }
-            catch(Exception e) {
-            }
-        }
-
-
-        // 2. then close the connections
-        synchronized(conns) {
-            it=conns.values().iterator();
-            while(it.hasNext()) {
-                conn=(Connection)it.next();
-                conn.destroy();
-            }
-            conns.clear();
-        }
-        local_addr=null;
-    }
-
-
-    /**
-     Remove <code>addr</code>from connection table. This is typically triggered when a member is suspected.
-     */
-    public void remove(Address addr) {
-        Connection conn;
-
-        synchronized(conns) {
-            conn=(Connection)conns.remove(addr);
-        }
-
-        if(conn != null) {
-            try {
-                conn.destroy();  // won't do anything if already destroyed
-            }
-            catch(Exception e) {
-            }
-        }
-        if(log.isTraceEnabled()) log.trace("removed " + addr + ", connections are " + toString());
-    }
-
-
-    /**
-     * Acceptor thread. Continuously accept new connections. Create a new thread for each new
-     * connection and put it in conns. When the thread should stop, it is
-     * interrupted by the thread creator.
-     */
-    public void run() {
-        Socket     client_sock;
-        Connection conn=null;
-        Address    peer_addr;
-
-        while(srv_sock != null) {
-            try {
-                client_sock=srv_sock.accept();
-                if(log.isTraceEnabled())
-                    log.trace("accepted connection from " + client_sock.getInetAddress() + ":" + client_sock.getPort());
-                try {
-                    client_sock.setSendBufferSize(send_buf_size);
-                }
-                catch(IllegalArgumentException ex) {
-                    if(log.isErrorEnabled()) log.error("exception setting send buffer size to " +
-                           send_buf_size + " bytes", ex);
-                }
-                try {
-                    client_sock.setReceiveBufferSize(recv_buf_size);
-                }
-                catch(IllegalArgumentException ex) {
-                    if(log.isErrorEnabled()) log.error("exception setting receive buffer size to " +
-                           send_buf_size + " bytes", ex);
-                }
-
-                // create new thread and add to conn table
-                conn=new Connection(client_sock, null); // will call receive(msg)
-                // get peer's address
-                peer_addr=conn.readPeerAddress(client_sock);
-
-                // client_addr=new IpAddress(client_sock.getInetAddress(), client_port);
-                conn.setPeerAddress(peer_addr);
-
-                synchronized(conns) {
-                    if(conns.containsKey(peer_addr)) {
-                        if(log.isTraceEnabled())
-                            log.trace(peer_addr + " is already there, will reuse connection");
-                        //conn.destroy();
-                        //continue; // return; // we cannot terminate the thread (bela Sept 2 2004)
-                    }
-                    else {
-                        // conns.put(peer_addr, conn);
-                        addConnection(peer_addr, conn);
-                        notifyConnectionOpened(peer_addr);
-                    }
-                }
-
-                conn.init(); // starts handler thread on this socket
-            }
-            catch(SocketException sock_ex) {
-                if(log.isInfoEnabled()) log.info(ExternalStrings.ConnectionTable_EXCEPTION_IS__0, sock_ex);
-                if(conn != null)
-                    conn.destroy();
-                if(srv_sock == null)
-                    break;  // socket was closed, therefore stop
-            }
-            catch(Throwable ex) {
-                if(log.isWarnEnabled()) log.warn("exception is " + ex);
-                if(srv_sock == null)
-                    break;  // socket was closed, therefore stop
-            }
-        }
-        if(log.isTraceEnabled())
-            log.trace(Thread.currentThread().getName() + " terminated");
-    }
-
-
-    /**
-     * Calls the receiver callback. We serialize access to this method because it may be called concurrently
-     * by several Connection handler threads. Therefore the receiver doesn't need to synchronize.
-     */
-    public void receive(Address sender, byte[] data, int offset, int length) {
-        if(receiver != null) {
-            synchronized(recv_mutex) {
-                receiver.receive(sender, data, offset, length);
-            }
-        }
-        else
-            if(log.isErrorEnabled()) log.error(ExternalStrings.ConnectionTable_RECEIVER_IS_NULL_NOT_SET_);
-    }
-
-
-    @Override // GemStoneAddition
-    public String toString() {
-        StringBuffer ret=new StringBuffer();
-        Address key;
-        Connection val;
-        Map.Entry entry;
-        HashMap copy;
-
-        synchronized(conns) {
-            copy=new HashMap(conns);
-        }
-        ret.append("connections (" + copy.size() + "):\n");
-        for(Iterator it=copy.entrySet().iterator(); it.hasNext();) {
-            entry=(Map.Entry)it.next();
-            key=(Address)entry.getKey();
-            val=(Connection)entry.getValue();
-            ret.append("key: " + key + ": " + val + '\n');
-        }
-        ret.append('\n');
-        return ret.toString();
-    }
-
-
-    /** Finds first available port starting at start_port and returns server socket.
-      * Will not bind to port >end_port. Sets srv_port */
-    protected ServerSocket createServerSocket(int start_port, int end_port) throws Exception {
-        ServerSocket ret=null;
-
-        while(true) {
-            try {
-                if(bind_addr == null)
-                    ret=new ServerSocket(start_port);
-                else {
-
-                    ret=new ServerSocket(start_port, backlog, bind_addr);
-                }
-            }
-            catch(SocketException bind_ex) {
-              // GemStoneAddition
-              if (Util.treatAsBindException(bind_ex)) {
-                if (start_port==end_port) throw new BindException("No available port to bind to");
-                  if(bind_addr != null) {
-                    NetworkInterface nic=NetworkInterface.getByInetAddress(bind_addr);
-                    if(nic == null)
-                        throw new BindException("bind_addr " + bind_addr + " is not a valid interface");
-                }
-                start_port++;
-                continue;
-              } else {
-                //not a manifestation of BindException, handle like IOException
-                if(log.isErrorEnabled()) log.error(ExternalStrings.ConnectionTable_EXCEPTION_IS__0, bind_ex);
-              }
-            }
-            catch(IOException io_ex) {
-                if(log.isErrorEnabled()) log.error(ExternalStrings.ConnectionTable_EXCEPTION_IS__0, io_ex);
-            }
-            srv_port=start_port;
-            break;
-        }
-        return ret;
-    }
-
-
-    void notifyConnectionOpened(Address peer) {
-        if(peer == null) return;
-        for(int i=0; i < conn_listeners.size(); i++)
-            ((ConnectionListener)conn_listeners.elementAt(i)).connectionOpened(peer);
-    }
-
-    void notifyConnectionClosed(Address peer) {
-        if(peer == null) return;
-        for(int i=0; i < conn_listeners.size(); i++)
-            ((ConnectionListener)conn_listeners.elementAt(i)).connectionClosed(peer);
-    }
-
-
-    void addConnection(Address peer, Connection c) {
-      synchronized (conns) { // GemStoneAddition
-        conns.put(peer, c);
-      }
-        if(reaper != null && !reaper.isRunning())
-            reaper.start();
-    }
-
-
-
-
-    class Connection implements Runnable {
-        Socket           sock=null;                // socket to/from peer (result of srv_sock.accept() or new Socket())
-        String           sock_addr=null;           // used for Thread.getName()
-        DataOutputStream out=null;                 // for sending messages
-        DataInputStream  in=null;                  // for receiving messages
-        Thread           receiverThread=null;      // thread for receiving messages // GemStoneAddition - accesses synchronized via this
-        Address          peer_addr=null;           // address of the 'other end' of the connection
-        final Object     send_mutex=new Object();  // serialize sends
-        long             last_access=System.currentTimeMillis(); // last time a message was sent or received
-
-        /** Queue of byte[] of data to be sent to the peer of this connection */
-        Queue            send_queue=new Queue();
-        Sender           sender=new Sender();
-
-
-        protected String getSockAddress() {
-            if(sock_addr != null)
-                return sock_addr;
-            if(sock != null) {
-                StringBuffer sb=new StringBuffer();
-                sb.append(sock.getLocalAddress().getHostAddress()).append(':').append(sock.getLocalPort());
-                sb.append(" - ").append(sock.getInetAddress().getHostAddress()).append(':').append(sock.getPort());
-                sock_addr=sb.toString();
-            }
-            return sock_addr;
-        }
-
-
-
-
-        Connection(Socket s, Address peer_addr) {
-            sock=s;
-            this.peer_addr=peer_addr;
-            try {
-                // out=new DataOutputStream(sock.getOutputStream());
-                // in=new DataInputStream(sock.getInputStream());
-
-                // The change to buffered input and output stream yielded a 400% performance gain !
-                // bela Sept 7 2006
-                out=new DataOutputStream(new BufferedOutputStream(sock.getOutputStream()));
-                in=new DataInputStream(new BufferedInputStream(sock.getInputStream()));
-
-                // in=new DataInputStream(sock.getInputStream());
-            }
-            catch(Exception ex) {
-                if(log.isErrorEnabled()) log.error(ExternalStrings.ConnectionTable_EXCEPTION_IS__0, ex);
-            }
-        }
-
-
-        synchronized /* GemStoneAddition */ boolean established() {
-            return receiverThread != null && receiverThread.isAlive() /* GemStoneAddition */;
-        }
-
-
-        void setPeerAddress(Address peer_addr) {
-            this.peer_addr=peer_addr;
-        }
-
-        Address getPeerAddress() {return peer_addr;}
-
-        void updateLastAccessed() {
-            last_access=System.currentTimeMillis();
-        }
-
-        synchronized /* GemStoneAddition */ void init() {
-            // if(log.isInfoEnabled()) log.info("connection was created to " + peer_addr);
-            if(receiverThread == null || !receiverThread.isAlive()) {
-                // Roland Kurmann 4/7/2003, put in thread_group
-                receiverThread=new Thread(thread_group, this, "ConnectionTable.Connection.Receiver [" + getSockAddress() + "]");
-                receiverThread.setDaemon(true);
-                receiverThread.start();
-                if(log.isTraceEnabled())
-                    log.trace("ConnectionTable.Connection.Receiver started");
-            }
-        }
-
-
-        synchronized /* GemStoneAddition */ void destroy() {
-            closeSocket(); // should terminate handler as well
-            sender.stop();
-            if (receiverThread != null) receiverThread.interrupt(); // GemStoneAddition
-            receiverThread=null;
-        }
-
-
-        void send(byte[] data, int offset, int length) {
-            if(use_send_queues) {
-                try {
-                    if(data != null) {
-                        // we need to copy the byte[] buffer here because the original buffer might get changed
-                        // in the meantime
-                        byte[] tmp=new byte[length];
-                        System.arraycopy(data, offset, tmp, 0, length);
-                        send_queue.add(tmp);}
-                    else {
-                        send_queue.add(NULL_DATA);
-                    }
-                    if(!sender.isRunning())
-                        sender.start();
-                }
-                catch(QueueClosedException e) {
-                    log.error(ExternalStrings.ConnectionTable_FAILED_ADDING_MESSAGE_TO_SEND_QUEUE, e);
-                }
-            }
-            else
-                _send(data, offset, length);
-        }
-
-
-        protected/*GemStoneAddition*/ void _send(byte[] data, int offset, int length) {
-            synchronized(send_mutex) {
-                try {
-                    doSend(data, offset, length);
-                    updateLastAccessed();
-                }
-                catch(IOException io_ex) {
-                    if(log.isWarnEnabled())
-                        log.warn("peer closed connection, trying to re-establish connection and re-send msg");
-                    try {
-                        doSend(data, offset, length);
-                        updateLastAccessed();
-                    }
-                    catch(IOException io_ex2) {
-                         if(log.isErrorEnabled()) log.error(ExternalStrings.ConnectionTable_2ND_ATTEMPT_TO_SEND_DATA_FAILED_TOO);
-                    }
-                    catch(Exception ex2) {
-                         if(log.isErrorEnabled()) log.error(ExternalStrings.ConnectionTable_EXCEPTION_IS__0, ex2);
-                    }
-                }
-                catch(Throwable ex) {
-                     if(log.isErrorEnabled()) log.error(ExternalStrings.ConnectionTable_EXCEPTION_IS__0, ex);
-                }
-            }
-        }
-
-
-        void doSend(byte[] data, int offset, int length) throws Exception {
-            try {
-                // we're using 'double-writes', sending the buffer to the destination in 2 pieces. this would
-                // ensure that, if the peer closed the connection while we were idle, we would get an exception.
-                // this won't happen if we use a single write (see Stevens, ch. 5.13).
-                if(out != null) {
-                    out.writeInt(length); // write the length of the data buffer first
-                    Util.doubleWrite(data, offset, length, out);
-                    out.flush();  // may not be very efficient (but safe)
-                }
-            }
-            catch(Exception ex) {
-                if(log.isErrorEnabled())
-                    log.error(ExternalStrings.ConnectionTable_FAILURE_SENDING_TO__0, peer_addr, ex);
-                remove(peer_addr);
-                throw ex;
-            }
-        }
-
-
-        /**
-         * Reads the peer's address. First a cookie has to be sent which has to match my own cookie, otherwise
-         * the connection will be refused
-         */
-        Address readPeerAddress(Socket client_sock) throws Exception {
-            Address     client_peer_addr=null;
-            byte[]      input_cookie=new byte[cookie.length];
-            int         client_port=client_sock != null? client_sock.getPort() : 0;
-            short       version;
-            InetAddress client_addr=client_sock != null? client_sock.getInetAddress() : null;
-
-            if(in != null) {
-                initCookie(input_cookie);
-
-                // read the cookie first
-                if (input_cookie.length != in.read(input_cookie, 0, input_cookie.length)) {
-                  throw new SocketException("Failed to read input cookie"); // GemStoneAddition
-                }
-                if(!matchCookie(input_cookie))
-                    throw new SocketException("ConnectionTable.Connection.readPeerAddress(): cookie sent by " +
-                                              client_peer_addr + " does not match own cookie; terminating connection");
-                // then read the version
-                version=in.readShort();
-
-                if(JGroupsVersion.compareTo(version) == false) {
-                    if(log.isWarnEnabled())
-                        log.warn(new StringBuffer("packet from ").append(client_addr).append(':').append(client_port).
-                               append(" has different version (").append(version).append(") from ours (").
-                                 append(JGroupsVersion.version).append("). This may cause problems"));
-                }
-                client_peer_addr=new IpAddress();
-                client_peer_addr.readFrom(in);
-
-                updateLastAccessed();
-            }
-            return client_peer_addr;
-        }
-
-
-        /**
-         * Send the cookie first, then the our port number. If the cookie doesn't match the receiver's cookie,
-         * the receiver will reject the connection and close it.
-         */
-        void sendLocalAddress(Address local_addr) {
-            if(local_addr == null) {
-                if(log.isWarnEnabled()) log.warn("local_addr is null");
-                return;
-            }
-            if(out != null) {
-                try {
-                    // write the cookie
-                    out.write(cookie, 0, cookie.length);
-
-                    // write the version
-                    out.writeShort(JGroupsVersion.version);
-                    local_addr.writeTo(out);
-                    out.flush(); // needed ?
-                    updateLastAccessed();
-                }
-                catch(Throwable t) {
-                    if(log.isErrorEnabled()) log.error(ExternalStrings.ConnectionTable_EXCEPTION_IS__0, t);
-                }
-            }
-        }
-
-
-        void initCookie(byte[] c) {
-            if(c != null)
-                for(int i=0; i < c.length; i++)
-                    c[i]=0;
-        }
-
-        boolean matchCookie(byte[] input) {
-            if(input == null || input.length < cookie.length) return false;
-            if(log.isInfoEnabled()) log.info(ExternalStrings.ConnectionTable_INPUT_COOKIE_IS__0, printCookie(input));
-            for(int i=0; i < cookie.length; i++)
-                if(cookie[i] != input[i]) return false;
-            return true;
-        }
-
-
-        String printCookie(byte[] c) {
-            if(c == null) return "";
-            return new String(c);
-        }
-
-
-        public void run() {
-            byte[] buf=new byte[256]; // start with 256, increase as we go
-            int len=0;
-
-            for (;;) { // GemStoneAddition -- remove coding anti-pattern
-              if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition
-                try {
-                    if(in == null) {
-                        if(log.isErrorEnabled()) log.error(ExternalStrings.ConnectionTable_INPUT_STREAM_IS_NULL_);
-                        break;
-                    }
-                    len=in.readInt();
-                    if(len > buf.length)
-                        buf=new byte[len];
-                    in.readFully(buf, 0, len);
-                    updateLastAccessed();
-                    receive(peer_addr, buf, 0, len); // calls receiver.receive(msg)
-                }
-                catch(OutOfMemoryError mem_ex) {
-                    if(log.isWarnEnabled()) log.warn("dropped invalid message, closing connection");
-                    break; // continue;
-                }
-                catch(EOFException eof_ex) {  // peer closed connection
-                    if(log.isInfoEnabled()) log.info(ExternalStrings.ConnectionTable_EXCEPTION_IS__0, eof_ex);
-                    notifyConnectionClosed(peer_addr);
-                    break;
-                }
-                catch(IOException io_ex) {
-                    if(log.isInfoEnabled()) log.info(ExternalStrings.ConnectionTable_EXCEPTION_IS__0, io_ex);
-                    notifyConnectionClosed(peer_addr);
-                    break;
-                }
-                catch(Throwable e) {
-                    if(log.isWarnEnabled()) log.warn("exception is " + e);
-                }
-            }
-            if(log.isTraceEnabled())
-                log.trace("ConnectionTable.Connection.Receiver terminated");
-//            receiverThread=null; GemStoneAddition
-            closeSocket();
-            remove(peer_addr);
-        }
-
-
-        @Override // GemStoneAddition
-        public String toString() {
-            StringBuffer ret=new StringBuffer();
-            InetAddress local=null, remote=null;
-            String local_str, remote_str;
-
-            if(sock == null)
-                ret.append("<null socket>");
-            else {
-                //since the sock variable gets set to null we want to make
-                //make sure we make it through here without a nullpointer exception
-                Socket tmp_sock=sock;
-                local=tmp_sock.getLocalAddress();
-                remote=tmp_sock.getInetAddress();
-                local_str=local != null ? Util.shortName(local) : "<null>";
-                remote_str=remote != null ? Util.shortName(remote) : "<null>";
-                ret.append('<' + local_str + ':' + tmp_sock.getLocalPort() +
-                           " --> " + remote_str + ':' + tmp_sock.getPort() + "> (" +
-                           ((System.currentTimeMillis() - last_access) / 1000) + " secs old)");
-                tmp_sock=null;
-            }
-
-            return ret.toString();
-        }
-
-
-        void closeSocket() {
-            if(sock != null) {
-                try {
-                    sock.close(); // should actually close in/out (so we don't need to close them explicitly)
-                }
-                catch(Exception e) {
-                }
-                sock=null;
-            }
-            if(out != null) {
-                try {
-                    out.close(); // flushes data
-                }
-                catch(Exception e) {
-                }
-                // removed 4/22/2003 (request by Roland Kurmann)
-                // out=null;
-            }
-            if(in != null) {
-                try {
-                    in.close();
-                }
-                catch(Exception ex) {
-                }
-                in=null;
-            }
-        }
-
-
-        class Sender implements Runnable {
-            Thread senderThread; // GemStoneAddition - synchronized on this
-//            private boolean running=false; GemStoneAddition remove coding anti-pattern
-
-            synchronized /* GemStoneAddition */ void start() {
-                if(senderThread == null || !senderThread.isAlive()) {
-                    senderThread=new Thread(thread_group, this, "ConnectionTable.Connection.Sender [" + getSockAddress() + "]");
-                    senderThread.setDaemon(true);
-                    senderThread.start();
-//                    running=true; GemStoneAddition
-                    if(log.isTraceEnabled())
-                        log.trace("ConnectionTable.Connection.Sender thread started");
-                }
-            }
-
-            synchronized /* GemStoneAddition */ void stop() {
-                if(senderThread != null) {
-                    senderThread.interrupt();
-                    senderThread=null;
-//                    running=false; GemStoneAddition
-                }
-            }
-
-            synchronized /* GemStoneAddition */ boolean isRunning() {
-                return /* running && */ senderThread != null && senderThread.isAlive() /* GemStoneAddition */;
-            }
-
-            public void run() {
-                byte[] data;
-                for (;;) { // GemStoneAddition remove coding anti-pattern
-                  if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition
-                    try {
-                        data=(byte[])send_queue.remove();
-                        if(data == null)
-                            continue;
-                        _send(data, 0, data.length);
-                    }
-                    catch (InterruptedException ie) { // GemStoneAddition
-                        // no need to reset the bit; we're exiting
-                        break; // exit loop and thread
-                    }
-                    catch(QueueClosedException e) {
-                        break;
-                    }
-                }
-//                running=false; GemStoneAddition
-                if(log.isTraceEnabled())
-                    log.trace("ConnectionTable.Connection.Sender thread terminated");
-            }
-        }
-
-
-    }
-
-
-    class Reaper implements Runnable {
-        Thread t=null; // GemStoneAddition synchronize on this to access
-
-        Reaper() {
-            ;
-        }
-
-        public void start() {
-            if(conns.size() == 0)
-                return;
-            synchronized (this) { // GemStoneAddition
-//            if(t != null && !t.isAlive())
-//                t=null;       GemStoneAddition
-            if(t == null || !t.isAlive()) {
-                //RKU 7.4.2003, put in threadgroup
-                t=new Thread(thread_group, this, "ConnectionTable.ReaperThread");
-                t.setDaemon(true); // will allow us to terminate if all remaining threads are daemons
-                t.start();
-            }
-            }
-        }
-
-        public void stop() {
-          synchronized (this) { // GemStoneAddition
-            if(t != null) {
-                t.interrupt(); // GemStoneAddition
-                t=null;
-            }
-          }
-        }
-
-
-        synchronized /* GemStoneAddition */ public boolean isRunning() {
-            return t != null && t.isAlive() /* GemStoneAddition */;
-        }
-
-        public void run() {
-            Connection value;
-            Map.Entry entry;
-            long curr_time;
-
-            if(log.isInfoEnabled()) log.info("connection reaper thread was started. Number of connections=" +
-                                             conns.size() + ", reaper_interval=" + reaper_interval + ", conn_expire_time=" +
-                                             conn_expire_time);
-
-            for (;;) { // GemStoneAddition remove coding anti-pattern 
-                // first sleep
-//              if (conns.size() == 0) break; // GemStoneAddition but needs to be synchronized
-              try { // GemStoneAddition
-                Util.sleep(reaper_interval);
-              }
-              catch (InterruptedException e) {
-                // Thread.currentThread().interrupt(); not needed; we're exiting
-                break; // exit loop and thread
-              }
-                synchronized(conns) {
-                    if (conns.size() == 0) break; // GemStoneAddition
-                    curr_time=System.currentTimeMillis();
-                    for(Iterator it=conns.entrySet().iterator(); it.hasNext();) {
-                        entry=(Map.Entry)it.next();
-                        value=(Connection)entry.getValue();
-                        if(log.isInfoEnabled()) log.info("connection is " +
-                                                         ((curr_time - value.last_access) / 1000) + " seconds old (curr-time=" +
-                                                         curr_time + ", last_access=" + value.last_access + ')');
-                        if(value.last_access + conn_expire_time < curr_time) {
-                            if(log.isInfoEnabled()) log.info("connection " + value +
-                                                             " has been idle for too long (conn_expire_time=" + conn_expire_time +
-                                                             "), will be removed");
-                            value.destroy();
-                            it.remove();
-                        }
-                    }
-                }
-            }
-            if(log.isInfoEnabled()) log.info(ExternalStrings.ConnectionTable_REAPER_TERMINATED);
-//            t=null; GemStoneAddition
-        }
-    }
-
-
-}
-