You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/08/21 23:23:12 UTC
[48/51] [partial] incubator-geode git commit: GEODE-77 removing the
old jgroups subproject
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/View.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/View.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/View.java
deleted file mode 100644
index 5a8eada..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/View.java
+++ /dev/null
@@ -1,528 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: View.java,v 1.10 2005/08/08 09:48:06 belaban Exp $
-
-package com.gemstone.org.jgroups;
-
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.Vector;
-
-import com.gemstone.org.jgroups.protocols.pbcast.Digest;
-import com.gemstone.org.jgroups.stack.IpAddress;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.Streamable;
-import com.gemstone.org.jgroups.util.StreamableFixedID;
-import com.gemstone.org.jgroups.util.Util;
-import com.gemstone.org.jgroups.util.VersionedStreamable;
-
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-
-/**
- * A view is a local representation of the current membership of a group.
- * Only one view is installed in a channel at a time.
- * Views contain the address of its creator, an ID and a list of member addresses.
- * These adresses are ordered, and the first address is always the coordinator of the view.
- * This way, each member of the group knows who the new coordinator will be if the current one
- * crashes or leaves the group.
- * The views are sent between members using the VIEW_CHANGE event.
- */
-public class View implements Externalizable, Cloneable, StreamableFixedID {
-
- /** GemStoneAddition - added for size check on views that carry;
- * credentials
- */
-// public static int MAX_VIEW_SIZE = 60000;
-
- /* A view is uniquely identified by its ViewID
- * The view id contains the creator address and a Lamport time.
- * The Lamport time is the highest timestamp seen or sent from a view.
- * if a view change comes in with a lower Lamport time, the event is discarded.
- */
- protected ViewId vid;
-
- /**
- * A list containing all the members of the view
- * This list is always ordered, with the coordinator being the first member.
- * the second member will be the new coordinator if the current one disappears
- * or leaves the group.
- */
- protected Vector members;
-
- /**
- * GemStoneAddition -- any additional payload to be sent
- * Used by the security AUTH layer to add/verify credentials.
- */
- private Object additionalData;
-
- /**
- * GemStoneAddition - size of serialized form of additionalData
- */
- private int additionalDataSize;
-
- /**
- * GemStoneAddition - Members removed from previous view as Suspects
- */
- private Set suspectedMembers;
-
- /**
- * GemStoneAddition - message digest moved from GmsHeader for FRAG3
- * fragmentation
- */
- private Digest messageDigest;
-
- /**
- * creates an empty view, should not be used
- */
- public View() {
- }
-
-
- /**
- * Creates a new view
- *
- * @param vid The view id of this view (can not be null)
- * @param members Contains a list of all the members in the view, can be empty but not null.
- */
- public View(ViewId vid, Vector members) {
- this.vid=vid;
- this.members=members;
- }
-
-
- /**
- * Creates a new view
- *
- * @param creator The creator of this view (can not be null)
- * @param id The lamport timestamp of this view
- * @param members Contains a list of all the members in the view, can be empty but not null.
- */
- public View(Address creator, long id, Vector members) {
- this(new ViewId(creator, id), members);
- }
-
- /**
- * Creates a new view
- *
- * @param creator The creator of this view (can not be null)
- * @param id The lamport timestamp of this view
- * @param members Contains a list of all the members in the view, can be empty but not null.
- * @param suspectedMembers GemStoneAddition - tracking of ousted members
- */
- public View(Address creator, long id, Vector members, Vector suspectedMembers) {
- this(new ViewId(creator, id), members);
- if (suspectedMembers != null) {
- this.suspectedMembers = new HashSet(suspectedMembers);
- }
- }
-
-
- /**
- * returns the view ID of this view
- * if this view was created with the empty constructur, null will be returned
- *
- * @return the view ID of this view
- */
- public ViewId getVid() {
- return vid;
- }
-
- /**
- * returns the creator of this view
- * if this view was created with the empty constructur, null will be returned
- *
- * @return the creator of this view in form of an Address object
- */
- public Address getCreator() {
- return vid != null ? vid.getCoordAddress() : null;
- }
-
- /**
- * returns the coordinator of this view, which may not be the creator if it is not in the view.
- * GemStoneAddition
- */
- public Address getCoordinator() {
-
- if (this.members.size() < 1) {
- return null;
- }
-
- return new Membership(this.members).getCoordinator();
- }
-
- /**
- * Returns a reference to the List of members (ordered)
- * Do NOT change this list, hence your will invalidate the view
- * Make a copy if you have to modify it.
- *
- * @return a reference to the ordered list of members in this view
- */
- public Vector getMembers() {
- return members;
- }
-
- /**
- * GemStoneAddition
- * get the members that were removed from the previous view due to
- * Suspect processing
- */
- public Set getSuspectedMembers() {
- return this.suspectedMembers == null? Collections.EMPTY_SET : this.suspectedMembers;
- }
-
- /**
- * GemStoneAddition -- getter for additional data
- */
- public Object getAdditionalData() {
- return this.additionalData;
- }
-
- /**
- * GemStoneAddition -- setter for additional data
- */
- public void setAdditionalData(Object data) {
- this.additionalData = data;
- // until credential fragmentation is implemented, we must perform
- // a size check to make sure the view fits into a datagram
- try {
- ByteArrayOutputStream bas = new ByteArrayOutputStream(10000);
- ObjectOutputStream oos = new ObjectOutputStream(bas);
- oos.writeObject(data);
- this.additionalDataSize = bas.size();
-// if (serializedSize() > MAX_VIEW_SIZE) {
-// this.additionalData = null;
-// this.additionalDataSize = 0;
-// throw new IllegalArgumentException(
-// JGroupsStrings.View_SERIALIZED_VIEW_SIZE_0_EXCEEDS_MAXIMUM_OF_1
-// .toLocalizedString(new Object[] { Integer.valueOf(bas.size()), Integer.valueOf(MAX_VIEW_SIZE) }));
-// }
- }
- catch (IOException e) {
- // ignore - this will happen again when the view is serialized
- // for transmission
- }
- }
-
- /**
- * returns true, if this view contains a certain member
- *
- * @param mbr - the address of the member,
- * @return true if this view contains the member, false if it doesn't
- * if the argument mbr is null, this operation returns false
- */
- public boolean containsMember(Address mbr) {
- if(mbr == null || members == null) {
- return false;
- }
- return members.contains(mbr);
- }
-
- /**
- * GemStoneAddition - removes the given address from the set of suspected members
- * @param mbr
- */
- public void notSuspect(Address mbr) {
- this.suspectedMembers.remove(mbr);
- }
-
- /**
- * GemStoneAddition - retrieve the address in the view that corresponds
- * to the given address. If the address is not in the view, return
- * the argument.
- */
- public Address getMember(Address addr) {
- int sz = members.size();
- // reverse search to find the newest member that matches, in case
- // of membership race condition that includes an old ID and a new ID
- // for a member that's joining
- for (int i=sz-1; i>0; i--) {
- Address mbr = (Address)members.get(i);
- if (mbr.equals(addr)) {
- return mbr;
- }
- }
- return addr;
- }
-
-
- @Override // GemStoneAddition
- public boolean equals(Object obj) {
- if(obj == null)
- return false;
- if(vid != null) {
- if (!(obj instanceof View)) return false; // GemStoneAddition
- int rc=vid.compareTo(((View)obj).vid);
- if(rc != 0)
- return false;
- if(members != null && ((View)obj).members != null) {
- return members.equals(((View)obj).members);
- }
- }
- return false;
- }
-
- /*
- * (non-Javadoc)
- * @see java.lang.Object#hashCode()
- *
- * Note that we just need to make sure that equal objects return equal
- * hashcodes; nothing really elaborate is done here.
- */
- @Override // GemStoneAddition
- public int hashCode() { // GemStoneAddition
- int result = 0;
- if (vid != null) {
- result += vid.hashCode();
- if (members != null) {
- result += members.hashCode();
- }
- }
- return result;
- }
-
- /**
- * returns the number of members in this view
- *
- * @return the number of members in this view 0..n
- */
- public int size() {
- return members == null ? 0 : members.size();
- }
-
-
- /**
- * creates a copy of this view
- *
- * @return a copy of this view
- */
- @Override // GemStoneAddition
- @SuppressFBWarnings(value="CN_IDIOM_NO_SUPER_CALL")
- public Object clone() {
- ViewId vid2=vid != null ? (ViewId)vid.clone() : null;
- Vector members2=members != null ? (Vector)members.clone() : null;
- View result = new View(vid2, members2);
- if (this.suspectedMembers != null) {
- result.suspectedMembers = new HashSet(this.suspectedMembers);
- }
- if (additionalData != null) {
- result.additionalData = additionalData;
- result.additionalDataSize = additionalDataSize;
- }
- return result;
- }
-
-
- /**
- * debug only
- */
- public String printDetails() {
- StringBuffer ret=new StringBuffer();
- ret.append(vid).append("\n\t");
- if(members != null) {
- for(int i=0; i < members.size(); i++) {
- ret.append(members.elementAt(i)).append("\n\t");
- }
- ret.append('\n');
- }
- return ret.toString();
- }
-
-
- // GemStoneAddition - get the member name for an address
- private String memberName(Address m) {
- if (!(m instanceof IpAddress))
- return m.toString();
- IpAddress im = (IpAddress)m;
- StringBuffer sb = new StringBuffer();
-
- sb.append(im.toString());
- int port = im.getDirectPort();
- if (port > 0) {
- sb.append('/');
- sb.append(port);
- }
- return sb.toString();
- }
-
-
- @Override // GemStoneAddition
- public String toString() {
- StringBuffer ret=new StringBuffer(64);
- // GemStoneAddition - give more info than jgroups defaults to giving
- ret.append(vid);
- ret.append(" [");
- for (Iterator iter = members.iterator(); iter.hasNext(); ) {
- Address member = (Address) iter.next();
- // GemStoneAddition
- ret.append(memberName(member));
-
- if (iter.hasNext()) {
- ret.append(", ");
- }
- }
- // GemStoneAddition
- if (this.additionalData != null) {
- ret.append(this.additionalData);
- }
- ret.append("]");
- if (this.suspectedMembers != null && this.suspectedMembers.size() > 0) {
- ret.append(" crashed mbrs: [");
- for (Iterator it=this.suspectedMembers.iterator(); it.hasNext(); ) {
- ret.append(memberName((Address)it.next()));
- if (it.hasNext()) {
- ret.append(", ");
- }
- }
- ret.append(']');
- }
- //ret.append(vid).append(" ").append(members);
- return ret.toString();
- }
- //public String toString() {
- // StringBuffer ret=new StringBuffer(64);
- // ret.append(vid).append(" ").append(members);
- // return ret.toString();
- //}
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(vid);
- out.writeObject(members);
- out.writeObject(this.suspectedMembers); // GemStoneAddition
- out.writeInt(this.additionalDataSize);
- out.writeObject(this.additionalData); // GemStoneAddition
- }
-
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- vid=(ViewId)in.readObject();
- members=(Vector)in.readObject();
- this.suspectedMembers = (Set)in.readObject(); // GemStoneAddition
- this.additionalDataSize = in.readInt();
- this.additionalData = in.readObject(); // GemStoneAddition
- }
-
- /** GemStoneAddition - find the lead member in this view */
- public Address getLeadMember() {
- for (int i=0; i<members.size(); i++) {
- Address mbr = (Address)members.get(i);
- if (((IpAddress)mbr).getVmKind() == 10) {
- return mbr;
- }
- }
- return null;
- }
-
-
- public void writeTo(DataOutputStream out) throws IOException {
- JChannel.getGfFunctions().invokeToData(this, out);
- }
-
- public void toData(DataOutput out) throws IOException {
- // vid
- if(vid != null) {
- out.writeBoolean(true);
- JChannel.getGfFunctions().invokeToData(vid, out);
- }
- else
- out.writeBoolean(false);
-
- // members:
- JChannel.getGfFunctions().writeObject(members, out);
- // GemStoneAddition - suspectedMembers
- if (this.suspectedMembers == null) {
- out.writeBoolean(false);
- }
- else {
- out.writeBoolean(true);
- JChannel.getGfFunctions().writeObject(this.suspectedMembers, out);
- }
- JChannel.getGfFunctions().writeObject(this.messageDigest, out);
- // GemStoneAddition
- out.writeInt(this.additionalDataSize);
- JChannel.getGfFunctions().writeObject(this.additionalData, out);
- }
-
-
- public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
- try {
- JChannel.getGfFunctions().invokeFromData(this, in);
- } catch (ClassNotFoundException ex) {
- throw new IllegalAccessException(
- ExternalStrings.View_COULD_NOT_READ_ADDITIONAL_DATA_0
- .toLocalizedString(ex));
- }
-
- }
-
- public int getDSFID() {
- return JGROUPS_VIEW;
- }
-
- public short[] getSerializationVersions() {
- return null;
- }
-
- public void fromData(DataInput in) throws IOException, ClassNotFoundException {
- boolean b;
- // vid:
- b=in.readBoolean();
- if(b) {
- vid=new ViewId();
- JChannel.getGfFunctions().invokeFromData(vid, in);
- }
-
- // members:
- members=JChannel.getGfFunctions().readObject(in);
-
- // GemStoneAddition - suspectedMembers
- if (in.readBoolean()) {
- this.suspectedMembers = JChannel.getGfFunctions().readObject(in);
- }
- this.messageDigest = JChannel.getGfFunctions().readObject(in);
- // GemStoneAddition
- this.additionalDataSize = in.readInt();
- this.additionalData = JChannel.getGfFunctions().readObject(in);
- }
-
- public int serializedSize(short version) {
- int retval=Global.BYTE_SIZE; // presence for vid
- if(vid != null)
- retval+=vid.serializedSize(version);
- retval+=Util.size(members,version);
- // GemStoneAddition - suspectedMembers
- if (this.suspectedMembers != null) {
- retval+=Util.size(this.suspectedMembers, version);
- }
- if (this.additionalData != null) {
- retval += this.additionalDataSize;
- }
- return retval;
- }
-
-
- public void setMessageDigest(Digest messageDigest) {
- this.messageDigest = messageDigest;
- }
-
-
- public Digest getMessageDigest() {
- return messageDigest;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/ViewId.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/ViewId.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/ViewId.java
deleted file mode 100644
index 48ff4c8..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/ViewId.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: ViewId.java,v 1.10 2005/07/12 11:45:42 belaban Exp $
-
-package com.gemstone.org.jgroups;
-
-import com.gemstone.org.jgroups.util.Streamable;
-import com.gemstone.org.jgroups.util.Util;
-import com.gemstone.org.jgroups.util.VersionedStreamable;
-
-import java.io.*;
-
-
-/**
- * ViewIds are used for ordering views (each view has a ViewId and a list of members).
- * Ordering between views is important for example in a virtual synchrony protocol where
- * all views seen by a member have to be ordered.
- */
-public class ViewId implements Externalizable, Comparable, Cloneable, VersionedStreamable {
- Address coord_addr=null; // Address of the issuer of this view
- long id=0; // Lamport time of the view
-
-
- public ViewId() { // used for externalization
- }
-
-
- /**
- * Creates a ViewID with the coordinator address and a Lamport timestamp of 0.
- *
- * @param coord_addr the address of the member that issued this view
- */
- public ViewId(Address coord_addr) {
- this.coord_addr=coord_addr;
- }
-
- /**
- * Creates a ViewID with the coordinator address and the given Lamport timestamp.
- *
- * @param coord_addr - the address of the member that issued this view
- * @param id - the Lamport timestamp of the view
- */
- public ViewId(Address coord_addr, long id) {
- this.coord_addr=coord_addr;
- this.id=id;
- }
-
- /**
- * returns the lamport time of the view
- *
- * @return the lamport time timestamp
- */
- public long getId() {
- return id;
- }
-
-
- /**
- * returns the address of the member that issued this view
- *
- * @return the Address of the the issuer
- */
- public Address getCoordAddress() {
- return coord_addr;
- }
-
-
- @Override // GemStoneAddition
- public String toString() {
- return "[" + coord_addr + '|' + id + ']';
- }
-
- /**
- * Cloneable interface
- * Returns a new ViewID object containing the same address and lamport timestamp as this view
- */
- @Override // GemStoneAddition
- public Object clone() {
- return new ViewId(coord_addr, id);
- }
-
- /**
- * Old Copy method, deprecated because it is substituted by clone()
- */
- public ViewId copy() {
- return (ViewId)clone();
- }
-
- /**
- * Establishes an order between 2 ViewIds. First compare on id. <em>Compare on coord_addr
- * only if necessary</em> (i.e. ids are equal) !
- *
- * @return 0 for equality, value less than 0 if smaller, greater than 0 if greater.
- */
- public int compareTo(Object other) {
- if(other == null) return 1; //+++ Maybe necessary to throw an exception
-
- if(!(other instanceof ViewId)) {
- throw new ClassCastException("ViewId.compareTo(): view id is not comparable with different Objects");
- }
- return id > ((ViewId)other).id ? 1 : id < ((ViewId)other).id ? -1 : 0;
- }
-
- /**
- * Old Compare
- */
- public int compare(Object o) {
- return compareTo(o);
- }
-
-
- @Override // GemStoneAddition
- public boolean equals(Object other_view) {
- if (other_view == null || !(other_view instanceof ViewId)) return false; // GemStoneAddition
- return compareTo(other_view) == 0 ? true : false;
- }
-
-
- @Override // GemStoneAddition
- public int hashCode() {
- return (int)id;
- }
-
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(coord_addr);
- out.writeLong(id);
- }
-
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- coord_addr=(Address)in.readObject();
- id=in.readLong();
- }
-
- public void writeTo(DataOutputStream out) throws IOException {
- JChannel.getGfFunctions().invokeToData(this, out);
- }
-
- public void toData(DataOutput out) throws IOException {
- JChannel.getGfFunctions().writeObject(coord_addr, out);
- out.writeLong(id);
- }
-
- public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
- try {
- JChannel.getGfFunctions().invokeFromData(this, in);
- } catch (Exception e) {
- InstantiationException ex = new InstantiationException("problem deserializing ViewId");
- ex.initCause(e);
- throw ex;
- }
- }
-
- public void fromData(DataInput in) throws IOException, ClassNotFoundException {
- coord_addr=JChannel.getGfFunctions().readObject(in);
- id=in.readLong();
- }
-
- public int serializedSize(short version) {
- int retval=Global.LONG_SIZE; // for the id
- retval+=Util.size(coord_addr,version);
- return retval;
- }
-
-
- @Override
- public short[] getSerializationVersions() {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/ConnectionTable.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/ConnectionTable.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/ConnectionTable.java
deleted file mode 100644
index 2b4e980..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/ConnectionTable.java
+++ /dev/null
@@ -1,1051 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: ConnectionTable.java,v 1.39 2005/11/18 19:50:54 smarlownovell Exp $
-
-package com.gemstone.org.jgroups.blocks;
-
-import com.gemstone.org.jgroups.util.GemFireTracer;
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.JGroupsVersion;
-import com.gemstone.org.jgroups.stack.IpAddress;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.Queue;
-import com.gemstone.org.jgroups.util.QueueClosedException;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.io.*;
-import java.net.*;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Vector;
-
-
-/**
- * Manages incoming and outgoing TCP connections. For each outgoing message to destination P, if there
- * is not yet a connection for P, one will be created. Subsequent outgoing messages will use this
- * connection. For incoming messages, one server socket is created at startup. For each new incoming
- * client connecting, a new thread from a thread pool is allocated and listens for incoming messages
- * until the socket is closed by the peer.<br>Sockets/threads with no activity will be killed
- * after some time.
- * <p>
- * Incoming messages from any of the sockets can be received by setting the message listener.
- * @author Bela Ban
- */
-public class ConnectionTable implements Runnable {
- final HashMap conns=new HashMap(); // keys: Addresses (peer address), values: Connection
- Receiver receiver=null;
- ServerSocket srv_sock=null;
-// boolean reuse_addr=false; GemStoneAddition
- boolean use_send_queues=true;
- InetAddress bind_addr=null;
-
- /**
- * The address which will be broadcast to the group (the externally visible address which this host should
- * be contacted on). If external_addr is null, it will default to the same address that the server socket is bound to.
- */
- InetAddress external_addr=null;
- Address local_addr=null; // bind_addr + port of srv_sock
- int srv_port=7800;
- int max_port=0; // maximum port to bind to (if < srv_port, no limit)
- Thread acceptor=null; // continuously calls srv_sock.accept()
- static final int backlog=20; // 20 conn requests are queued by ServerSocket (addtl will be discarded)
- int recv_buf_size=120000;
- int send_buf_size=60000;
- final Vector conn_listeners=new Vector(); // listeners to be notified when a conn is established/torn down
- final Object recv_mutex=new Object(); // to serialize simultaneous access to receive() from multiple Connections
- Reaper reaper=null; // closes conns that have been idle for more than n secs
- long reaper_interval=60000; // reap unused conns once a minute
- long conn_expire_time=300000; // connections can be idle for 5 minutes before they are reaped
- boolean use_reaper=false; // by default we don't reap idle conns
- int sock_conn_timeout=1000; // max time in millis to wait for Socket.connect() to return
- ThreadGroup thread_group=null;
- protected final GemFireTracer log=GemFireTracer.getLog(getClass());
- final static byte[] NULL_DATA={};
- final byte[] cookie={'b', 'e', 'l', 'a'};
-
-
-
- /** Used for message reception. */
- public interface Receiver {
- void receive(Address sender, byte[] data, int offset, int length);
- }
-
-
-
- /** Used to be notified about connection establishment and teardown. */
- public interface ConnectionListener {
- void connectionOpened(Address peer_addr);
- void connectionClosed(Address peer_addr);
- }
-
-
- /**
- * Regular ConnectionTable without expiration of idle connections
- * @param srv_port The port on which the server will listen. If this port is reserved, the next
- * free port will be taken (incrementing srv_port).
- */
- public ConnectionTable(int srv_port) throws Exception {
- this.srv_port=srv_port;
- start();
- }
-
-
- public ConnectionTable(InetAddress bind_addr, int srv_port) throws Exception {
- this.srv_port=srv_port;
- this.bind_addr=bind_addr;
- start();
- }
-
-
- /**
- * ConnectionTable including a connection reaper. Connections that have been idle for more than conn_expire_time
- * milliseconds will be closed and removed from the connection table. On next access they will be re-created.
- * @param srv_port The port on which the server will listen
- * @param reaper_interval Number of milliseconds to wait for reaper between attepts to reap idle connections
- * @param conn_expire_time Number of milliseconds a connection can be idle (no traffic sent or received until
- * it will be reaped
- */
- public ConnectionTable(int srv_port, long reaper_interval, long conn_expire_time) throws Exception {
- this.srv_port=srv_port;
- this.reaper_interval=reaper_interval;
- this.conn_expire_time=conn_expire_time;
- use_reaper=true;
- start();
- }
-
-
- /**
- * Create a ConnectionTable
- * @param r A reference to a receiver of all messages received by this class. Method <code>receive()</code>
- * will be called.
- * @param bind_addr The host name or IP address of the interface to which the server socket will bind.
- * This is interesting only in multi-homed systems. If bind_addr is null, the
- * server socket will bind to the first available interface (e.g. /dev/hme0 on
- * Solaris or /dev/eth0 on Linux systems).
- * @param external_addr The address which will be broadcast to the group (the externally visible address
- * which this host should be contacted on). If external_addr is null, it will default to
- * the same address that the server socket is bound to.
- * @param srv_port The port to which the server socket will bind to. If this port is reserved, the next
- * free port will be taken (incrementing srv_port).
- * @param max_port The largest port number that the server socket will be bound to. If max_port < srv_port
- * then there is no limit.
- */
- public ConnectionTable(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port) throws Exception {
- setReceiver(r);
- this.bind_addr=bind_addr;
- this.external_addr=external_addr;
- this.srv_port=srv_port;
- this.max_port=max_port;
- start();
- }
-
-
- /**
- * ConnectionTable including a connection reaper. Connections that have been idle for more than conn_expire_time
- * milliseconds will be closed and removed from the connection table. On next access they will be re-created.
- *
- * @param r The Receiver
- * @param bind_addr The host name or IP address of the interface to which the server socket will bind.
- * This is interesting only in multi-homed systems. If bind_addr is null, the
- * server socket will bind to the first available interface (e.g. /dev/hme0 on
- * Solaris or /dev/eth0 on Linux systems).
- * @param external_addr The address which will be broadcast to the group (the externally visible address
- * which this host should be contacted on). If external_addr is null, it will default to
- * the same address that the server socket is bound to.
- * @param srv_port The port to which the server socket will bind to. If this port is reserved, the next
- * free port will be taken (incrementing srv_port).
- * @param max_port The largest port number that the server socket will be bound to. If max_port < srv_port
- * then there is no limit.
- * @param reaper_interval Number of milliseconds to wait for reaper between attepts to reap idle connections
- * @param conn_expire_time Number of milliseconds a connection can be idle (no traffic sent or received until
- * it will be reaped
- */
- public ConnectionTable(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port,
- long reaper_interval, long conn_expire_time) throws Exception {
- setReceiver(r);
- this.bind_addr=bind_addr;
- this.external_addr=external_addr;
- this.srv_port=srv_port;
- this.max_port=max_port;
- this.reaper_interval=reaper_interval;
- this.conn_expire_time=conn_expire_time;
- use_reaper=true;
- start();
- }
-
-
- public void setReceiver(Receiver r) {
- receiver=r;
- }
-
-
- public void addConnectionListener(ConnectionListener l) {
- if(l != null && !conn_listeners.contains(l))
- conn_listeners.addElement(l);
- }
-
-
- public void removeConnectionListener(ConnectionListener l) {
- if(l != null) conn_listeners.removeElement(l);
- }
-
-
- public Address getLocalAddress() {
- if(local_addr == null)
- local_addr=bind_addr != null ? new IpAddress(bind_addr, srv_port) : null;
- return local_addr;
- }
-
-
- public int getSendBufferSize() {
- return send_buf_size;
- }
-
- public void setSendBufferSize(int send_buf_size) {
- this.send_buf_size=send_buf_size;
- }
-
- public int getReceiveBufferSize() {
- return recv_buf_size;
- }
-
- public void setReceiveBufferSize(int recv_buf_size) {
- this.recv_buf_size=recv_buf_size;
- }
-
- public int getSocketConnectionTimeout() {
- return sock_conn_timeout;
- }
-
- public void setSocketConnectionTimeout(int sock_conn_timeout) {
- this.sock_conn_timeout=sock_conn_timeout;
- }
-
- public int getNumConnections() {
- synchronized (conns) { // GemStoneAddition
- return conns.size();
- }
- }
-
- public boolean getUseSendQueues() {return use_send_queues;}
- public void setUseSendQueues(boolean flag) {this.use_send_queues=flag;}
-
-
-
- public void send(Address dest, byte[] data, int offset, int length) throws Exception {
- Connection conn;
- if(dest == null) {
- if(log.isErrorEnabled())
- log.error(ExternalStrings.ConnectionTable_DESTINATION_IS_NULL);
- return;
- }
-
- if(data == null) {
- log.warn("data is null; discarding packet");
- return;
- }
-
- // 1. Try to obtain correct Connection (or create one if not yet existent)
- try {
- conn=getConnection(dest);
- if(conn == null) return;
- }
- catch(Throwable ex) {
- throw new Exception("connection to " + dest + " could not be established", ex);
- }
-
- // 2. Send the message using that connection
- try {
- conn.send(data, offset, length);
- }
- catch(Throwable ex) {
- ex.printStackTrace();
- if(log.isTraceEnabled())
- log.trace("sending msg to " + dest + " failed (" + ex.getClass().getName() + "); removing from connection table");
- remove(dest);
- }
- }
-
-
- /** Try to obtain correct Connection (or create one if not yet existent) */
- Connection getConnection(Address dest) throws Exception {
- Connection conn=null;
- Socket sock;
-
- synchronized(conns) {
- conn=(Connection)conns.get(dest);
- if(conn == null) {
- // changed by bela Jan 18 2004: use the bind address for the client sockets as well
- SocketAddress tmpBindAddr=new InetSocketAddress(bind_addr, 0);
- InetAddress tmpDest=((IpAddress)dest).getIpAddress();
- SocketAddress destAddr=new InetSocketAddress(tmpDest, ((IpAddress)dest).getPort());
- sock=new Socket();
- sock.bind(tmpBindAddr);
- sock.connect(destAddr, sock_conn_timeout);
-
- try {
- sock.setSendBufferSize(send_buf_size);
- }
- catch(IllegalArgumentException ex) {
- if(log.isErrorEnabled()) log.error("exception setting send buffer size to " +
- send_buf_size + " bytes", ex);
- }
- try {
- sock.setReceiveBufferSize(recv_buf_size);
- }
- catch(IllegalArgumentException ex) {
- if(log.isErrorEnabled()) log.error("exception setting receive buffer size to " +
- send_buf_size + " bytes", ex);
- }
- conn=new Connection(sock, dest);
- conn.sendLocalAddress(local_addr);
- notifyConnectionOpened(dest);
- // conns.put(dest, conn);
- addConnection(dest, conn);
- conn.init();
- if(log.isInfoEnabled()) log.info(ExternalStrings.ConnectionTable_CREATED_SOCKET_TO__0, dest);
- }
- return conn;
- }
- }
-
-
- public void start() throws Exception {
- init();
- srv_sock=createServerSocket(srv_port, max_port);
-
- if (external_addr!=null)
- local_addr=new IpAddress(external_addr, srv_sock.getLocalPort());
- else if (bind_addr != null)
- local_addr=new IpAddress(bind_addr, srv_sock.getLocalPort());
- else
- local_addr=new IpAddress(srv_sock.getLocalPort());
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.ConnectionTable_SERVER_SOCKET_CREATED_ON__0, local_addr);
-
- //Roland Kurmann 4/7/2003, build new thread group
- thread_group = new ThreadGroup(Thread.currentThread().getThreadGroup(), "ConnectionTableGroup");
- //Roland Kurmann 4/7/2003, put in thread_group
- acceptor=new Thread(thread_group, this, "ConnectionTable.AcceptorThread");
- acceptor.setDaemon(true);
- acceptor.start();
-
- // start the connection reaper - will periodically remove unused connections
- if(use_reaper && reaper == null) {
- reaper=new Reaper();
- reaper.start();
- }
- }
-
- protected void init() throws Exception {
- }
-
- /** Closes all open sockets, the server socket and all threads waiting for incoming messages */
- public void stop() {
- Iterator it=null;
- Connection conn;
- ServerSocket tmp;
-
- // 1. close the server socket (this also stops the acceptor thread)
- if(srv_sock != null) {
- try {
- tmp=srv_sock;
- srv_sock=null;
- tmp.close();
- }
- catch(Exception e) {
- }
- }
-
-
- // 2. then close the connections
- synchronized(conns) {
- it=conns.values().iterator();
- while(it.hasNext()) {
- conn=(Connection)it.next();
- conn.destroy();
- }
- conns.clear();
- }
- local_addr=null;
- }
-
-
- /**
- Remove <code>addr</code>from connection table. This is typically triggered when a member is suspected.
- */
- public void remove(Address addr) {
- Connection conn;
-
- synchronized(conns) {
- conn=(Connection)conns.remove(addr);
- }
-
- if(conn != null) {
- try {
- conn.destroy(); // won't do anything if already destroyed
- }
- catch(Exception e) {
- }
- }
- if(log.isTraceEnabled()) log.trace("removed " + addr + ", connections are " + toString());
- }
-
-
- /**
- * Acceptor thread. Continuously accept new connections. Create a new thread for each new
- * connection and put it in conns. When the thread should stop, it is
- * interrupted by the thread creator.
- */
- public void run() {
- Socket client_sock;
- Connection conn=null;
- Address peer_addr;
-
- while(srv_sock != null) {
- try {
- client_sock=srv_sock.accept();
- if(log.isTraceEnabled())
- log.trace("accepted connection from " + client_sock.getInetAddress() + ":" + client_sock.getPort());
- try {
- client_sock.setSendBufferSize(send_buf_size);
- }
- catch(IllegalArgumentException ex) {
- if(log.isErrorEnabled()) log.error("exception setting send buffer size to " +
- send_buf_size + " bytes", ex);
- }
- try {
- client_sock.setReceiveBufferSize(recv_buf_size);
- }
- catch(IllegalArgumentException ex) {
- if(log.isErrorEnabled()) log.error("exception setting receive buffer size to " +
- send_buf_size + " bytes", ex);
- }
-
- // create new thread and add to conn table
- conn=new Connection(client_sock, null); // will call receive(msg)
- // get peer's address
- peer_addr=conn.readPeerAddress(client_sock);
-
- // client_addr=new IpAddress(client_sock.getInetAddress(), client_port);
- conn.setPeerAddress(peer_addr);
-
- synchronized(conns) {
- if(conns.containsKey(peer_addr)) {
- if(log.isTraceEnabled())
- log.trace(peer_addr + " is already there, will reuse connection");
- //conn.destroy();
- //continue; // return; // we cannot terminate the thread (bela Sept 2 2004)
- }
- else {
- // conns.put(peer_addr, conn);
- addConnection(peer_addr, conn);
- notifyConnectionOpened(peer_addr);
- }
- }
-
- conn.init(); // starts handler thread on this socket
- }
- catch(SocketException sock_ex) {
- if(log.isInfoEnabled()) log.info(ExternalStrings.ConnectionTable_EXCEPTION_IS__0, sock_ex);
- if(conn != null)
- conn.destroy();
- if(srv_sock == null)
- break; // socket was closed, therefore stop
- }
- catch(Throwable ex) {
- if(log.isWarnEnabled()) log.warn("exception is " + ex);
- if(srv_sock == null)
- break; // socket was closed, therefore stop
- }
- }
- if(log.isTraceEnabled())
- log.trace(Thread.currentThread().getName() + " terminated");
- }
-
-
- /**
- * Calls the receiver callback. We serialize access to this method because it may be called concurrently
- * by several Connection handler threads. Therefore the receiver doesn't need to synchronize.
- */
- public void receive(Address sender, byte[] data, int offset, int length) {
- if(receiver != null) {
- synchronized(recv_mutex) {
- receiver.receive(sender, data, offset, length);
- }
- }
- else
- if(log.isErrorEnabled()) log.error(ExternalStrings.ConnectionTable_RECEIVER_IS_NULL_NOT_SET_);
- }
-
-
- @Override // GemStoneAddition
- public String toString() {
- StringBuffer ret=new StringBuffer();
- Address key;
- Connection val;
- Map.Entry entry;
- HashMap copy;
-
- synchronized(conns) {
- copy=new HashMap(conns);
- }
- ret.append("connections (" + copy.size() + "):\n");
- for(Iterator it=copy.entrySet().iterator(); it.hasNext();) {
- entry=(Map.Entry)it.next();
- key=(Address)entry.getKey();
- val=(Connection)entry.getValue();
- ret.append("key: " + key + ": " + val + '\n');
- }
- ret.append('\n');
- return ret.toString();
- }
-
-
- /** Finds first available port starting at start_port and returns server socket.
- * Will not bind to port >end_port. Sets srv_port */
- protected ServerSocket createServerSocket(int start_port, int end_port) throws Exception {
- ServerSocket ret=null;
-
- while(true) {
- try {
- if(bind_addr == null)
- ret=new ServerSocket(start_port);
- else {
-
- ret=new ServerSocket(start_port, backlog, bind_addr);
- }
- }
- catch(SocketException bind_ex) {
- // GemStoneAddition
- if (Util.treatAsBindException(bind_ex)) {
- if (start_port==end_port) throw new BindException("No available port to bind to");
- if(bind_addr != null) {
- NetworkInterface nic=NetworkInterface.getByInetAddress(bind_addr);
- if(nic == null)
- throw new BindException("bind_addr " + bind_addr + " is not a valid interface");
- }
- start_port++;
- continue;
- } else {
- //not a manifestation of BindException, handle like IOException
- if(log.isErrorEnabled()) log.error(ExternalStrings.ConnectionTable_EXCEPTION_IS__0, bind_ex);
- }
- }
- catch(IOException io_ex) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.ConnectionTable_EXCEPTION_IS__0, io_ex);
- }
- srv_port=start_port;
- break;
- }
- return ret;
- }
-
-
- void notifyConnectionOpened(Address peer) {
- if(peer == null) return;
- for(int i=0; i < conn_listeners.size(); i++)
- ((ConnectionListener)conn_listeners.elementAt(i)).connectionOpened(peer);
- }
-
- void notifyConnectionClosed(Address peer) {
- if(peer == null) return;
- for(int i=0; i < conn_listeners.size(); i++)
- ((ConnectionListener)conn_listeners.elementAt(i)).connectionClosed(peer);
- }
-
-
- void addConnection(Address peer, Connection c) {
- synchronized (conns) { // GemStoneAddition
- conns.put(peer, c);
- }
- if(reaper != null && !reaper.isRunning())
- reaper.start();
- }
-
-
-
-
- class Connection implements Runnable {
- Socket sock=null; // socket to/from peer (result of srv_sock.accept() or new Socket())
- String sock_addr=null; // used for Thread.getName()
- DataOutputStream out=null; // for sending messages
- DataInputStream in=null; // for receiving messages
- Thread receiverThread=null; // thread for receiving messages // GemStoneAddition - accesses synchronized via this
- Address peer_addr=null; // address of the 'other end' of the connection
- final Object send_mutex=new Object(); // serialize sends
- long last_access=System.currentTimeMillis(); // last time a message was sent or received
-
- /** Queue of byte[] of data to be sent to the peer of this connection */
- Queue send_queue=new Queue();
- Sender sender=new Sender();
-
-
- protected String getSockAddress() {
- if(sock_addr != null)
- return sock_addr;
- if(sock != null) {
- StringBuffer sb=new StringBuffer();
- sb.append(sock.getLocalAddress().getHostAddress()).append(':').append(sock.getLocalPort());
- sb.append(" - ").append(sock.getInetAddress().getHostAddress()).append(':').append(sock.getPort());
- sock_addr=sb.toString();
- }
- return sock_addr;
- }
-
-
-
-
- Connection(Socket s, Address peer_addr) {
- sock=s;
- this.peer_addr=peer_addr;
- try {
- // out=new DataOutputStream(sock.getOutputStream());
- // in=new DataInputStream(sock.getInputStream());
-
- // The change to buffered input and output stream yielded a 400% performance gain !
- // bela Sept 7 2006
- out=new DataOutputStream(new BufferedOutputStream(sock.getOutputStream()));
- in=new DataInputStream(new BufferedInputStream(sock.getInputStream()));
-
- // in=new DataInputStream(sock.getInputStream());
- }
- catch(Exception ex) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.ConnectionTable_EXCEPTION_IS__0, ex);
- }
- }
-
-
- synchronized /* GemStoneAddition */ boolean established() {
- return receiverThread != null && receiverThread.isAlive() /* GemStoneAddition */;
- }
-
-
- void setPeerAddress(Address peer_addr) {
- this.peer_addr=peer_addr;
- }
-
- Address getPeerAddress() {return peer_addr;}
-
- void updateLastAccessed() {
- last_access=System.currentTimeMillis();
- }
-
- synchronized /* GemStoneAddition */ void init() {
- // if(log.isInfoEnabled()) log.info("connection was created to " + peer_addr);
- if(receiverThread == null || !receiverThread.isAlive()) {
- // Roland Kurmann 4/7/2003, put in thread_group
- receiverThread=new Thread(thread_group, this, "ConnectionTable.Connection.Receiver [" + getSockAddress() + "]");
- receiverThread.setDaemon(true);
- receiverThread.start();
- if(log.isTraceEnabled())
- log.trace("ConnectionTable.Connection.Receiver started");
- }
- }
-
-
- synchronized /* GemStoneAddition */ void destroy() {
- closeSocket(); // should terminate handler as well
- sender.stop();
- if (receiverThread != null) receiverThread.interrupt(); // GemStoneAddition
- receiverThread=null;
- }
-
-
- void send(byte[] data, int offset, int length) {
- if(use_send_queues) {
- try {
- if(data != null) {
- // we need to copy the byte[] buffer here because the original buffer might get changed
- // in the meantime
- byte[] tmp=new byte[length];
- System.arraycopy(data, offset, tmp, 0, length);
- send_queue.add(tmp);}
- else {
- send_queue.add(NULL_DATA);
- }
- if(!sender.isRunning())
- sender.start();
- }
- catch(QueueClosedException e) {
- log.error(ExternalStrings.ConnectionTable_FAILED_ADDING_MESSAGE_TO_SEND_QUEUE, e);
- }
- }
- else
- _send(data, offset, length);
- }
-
-
- protected/*GemStoneAddition*/ void _send(byte[] data, int offset, int length) {
- synchronized(send_mutex) {
- try {
- doSend(data, offset, length);
- updateLastAccessed();
- }
- catch(IOException io_ex) {
- if(log.isWarnEnabled())
- log.warn("peer closed connection, trying to re-establish connection and re-send msg");
- try {
- doSend(data, offset, length);
- updateLastAccessed();
- }
- catch(IOException io_ex2) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.ConnectionTable_2ND_ATTEMPT_TO_SEND_DATA_FAILED_TOO);
- }
- catch(Exception ex2) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.ConnectionTable_EXCEPTION_IS__0, ex2);
- }
- }
- catch(Throwable ex) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.ConnectionTable_EXCEPTION_IS__0, ex);
- }
- }
- }
-
-
- void doSend(byte[] data, int offset, int length) throws Exception {
- try {
- // we're using 'double-writes', sending the buffer to the destination in 2 pieces. this would
- // ensure that, if the peer closed the connection while we were idle, we would get an exception.
- // this won't happen if we use a single write (see Stevens, ch. 5.13).
- if(out != null) {
- out.writeInt(length); // write the length of the data buffer first
- Util.doubleWrite(data, offset, length, out);
- out.flush(); // may not be very efficient (but safe)
- }
- }
- catch(Exception ex) {
- if(log.isErrorEnabled())
- log.error(ExternalStrings.ConnectionTable_FAILURE_SENDING_TO__0, peer_addr, ex);
- remove(peer_addr);
- throw ex;
- }
- }
-
-
- /**
- * Reads the peer's address. First a cookie has to be sent which has to match my own cookie, otherwise
- * the connection will be refused
- */
- Address readPeerAddress(Socket client_sock) throws Exception {
- Address client_peer_addr=null;
- byte[] input_cookie=new byte[cookie.length];
- int client_port=client_sock != null? client_sock.getPort() : 0;
- short version;
- InetAddress client_addr=client_sock != null? client_sock.getInetAddress() : null;
-
- if(in != null) {
- initCookie(input_cookie);
-
- // read the cookie first
- if (input_cookie.length != in.read(input_cookie, 0, input_cookie.length)) {
- throw new SocketException("Failed to read input cookie"); // GemStoneAddition
- }
- if(!matchCookie(input_cookie))
- throw new SocketException("ConnectionTable.Connection.readPeerAddress(): cookie sent by " +
- client_peer_addr + " does not match own cookie; terminating connection");
- // then read the version
- version=in.readShort();
-
- if(JGroupsVersion.compareTo(version) == false) {
- if(log.isWarnEnabled())
- log.warn(new StringBuffer("packet from ").append(client_addr).append(':').append(client_port).
- append(" has different version (").append(version).append(") from ours (").
- append(JGroupsVersion.version).append("). This may cause problems"));
- }
- client_peer_addr=new IpAddress();
- client_peer_addr.readFrom(in);
-
- updateLastAccessed();
- }
- return client_peer_addr;
- }
-
-
- /**
- * Send the cookie first, then the our port number. If the cookie doesn't match the receiver's cookie,
- * the receiver will reject the connection and close it.
- */
- void sendLocalAddress(Address local_addr) {
- if(local_addr == null) {
- if(log.isWarnEnabled()) log.warn("local_addr is null");
- return;
- }
- if(out != null) {
- try {
- // write the cookie
- out.write(cookie, 0, cookie.length);
-
- // write the version
- out.writeShort(JGroupsVersion.version);
- local_addr.writeTo(out);
- out.flush(); // needed ?
- updateLastAccessed();
- }
- catch(Throwable t) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.ConnectionTable_EXCEPTION_IS__0, t);
- }
- }
- }
-
-
- void initCookie(byte[] c) {
- if(c != null)
- for(int i=0; i < c.length; i++)
- c[i]=0;
- }
-
- boolean matchCookie(byte[] input) {
- if(input == null || input.length < cookie.length) return false;
- if(log.isInfoEnabled()) log.info(ExternalStrings.ConnectionTable_INPUT_COOKIE_IS__0, printCookie(input));
- for(int i=0; i < cookie.length; i++)
- if(cookie[i] != input[i]) return false;
- return true;
- }
-
-
- String printCookie(byte[] c) {
- if(c == null) return "";
- return new String(c);
- }
-
-
- public void run() {
- byte[] buf=new byte[256]; // start with 256, increase as we go
- int len=0;
-
- for (;;) { // GemStoneAddition -- remove coding anti-pattern
- if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition
- try {
- if(in == null) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.ConnectionTable_INPUT_STREAM_IS_NULL_);
- break;
- }
- len=in.readInt();
- if(len > buf.length)
- buf=new byte[len];
- in.readFully(buf, 0, len);
- updateLastAccessed();
- receive(peer_addr, buf, 0, len); // calls receiver.receive(msg)
- }
- catch(OutOfMemoryError mem_ex) {
- if(log.isWarnEnabled()) log.warn("dropped invalid message, closing connection");
- break; // continue;
- }
- catch(EOFException eof_ex) { // peer closed connection
- if(log.isInfoEnabled()) log.info(ExternalStrings.ConnectionTable_EXCEPTION_IS__0, eof_ex);
- notifyConnectionClosed(peer_addr);
- break;
- }
- catch(IOException io_ex) {
- if(log.isInfoEnabled()) log.info(ExternalStrings.ConnectionTable_EXCEPTION_IS__0, io_ex);
- notifyConnectionClosed(peer_addr);
- break;
- }
- catch(Throwable e) {
- if(log.isWarnEnabled()) log.warn("exception is " + e);
- }
- }
- if(log.isTraceEnabled())
- log.trace("ConnectionTable.Connection.Receiver terminated");
-// receiverThread=null; GemStoneAddition
- closeSocket();
- remove(peer_addr);
- }
-
-
- @Override // GemStoneAddition
- public String toString() {
- StringBuffer ret=new StringBuffer();
- InetAddress local=null, remote=null;
- String local_str, remote_str;
-
- if(sock == null)
- ret.append("<null socket>");
- else {
- //since the sock variable gets set to null we want to make
- //make sure we make it through here without a nullpointer exception
- Socket tmp_sock=sock;
- local=tmp_sock.getLocalAddress();
- remote=tmp_sock.getInetAddress();
- local_str=local != null ? Util.shortName(local) : "<null>";
- remote_str=remote != null ? Util.shortName(remote) : "<null>";
- ret.append('<' + local_str + ':' + tmp_sock.getLocalPort() +
- " --> " + remote_str + ':' + tmp_sock.getPort() + "> (" +
- ((System.currentTimeMillis() - last_access) / 1000) + " secs old)");
- tmp_sock=null;
- }
-
- return ret.toString();
- }
-
-
- void closeSocket() {
- if(sock != null) {
- try {
- sock.close(); // should actually close in/out (so we don't need to close them explicitly)
- }
- catch(Exception e) {
- }
- sock=null;
- }
- if(out != null) {
- try {
- out.close(); // flushes data
- }
- catch(Exception e) {
- }
- // removed 4/22/2003 (request by Roland Kurmann)
- // out=null;
- }
- if(in != null) {
- try {
- in.close();
- }
- catch(Exception ex) {
- }
- in=null;
- }
- }
-
-
- class Sender implements Runnable {
- Thread senderThread; // GemStoneAddition - synchronized on this
-// private boolean running=false; GemStoneAddition remove coding anti-pattern
-
- synchronized /* GemStoneAddition */ void start() {
- if(senderThread == null || !senderThread.isAlive()) {
- senderThread=new Thread(thread_group, this, "ConnectionTable.Connection.Sender [" + getSockAddress() + "]");
- senderThread.setDaemon(true);
- senderThread.start();
-// running=true; GemStoneAddition
- if(log.isTraceEnabled())
- log.trace("ConnectionTable.Connection.Sender thread started");
- }
- }
-
- synchronized /* GemStoneAddition */ void stop() {
- if(senderThread != null) {
- senderThread.interrupt();
- senderThread=null;
-// running=false; GemStoneAddition
- }
- }
-
- synchronized /* GemStoneAddition */ boolean isRunning() {
- return /* running && */ senderThread != null && senderThread.isAlive() /* GemStoneAddition */;
- }
-
- public void run() {
- byte[] data;
- for (;;) { // GemStoneAddition remove coding anti-pattern
- if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition
- try {
- data=(byte[])send_queue.remove();
- if(data == null)
- continue;
- _send(data, 0, data.length);
- }
- catch (InterruptedException ie) { // GemStoneAddition
- // no need to reset the bit; we're exiting
- break; // exit loop and thread
- }
- catch(QueueClosedException e) {
- break;
- }
- }
-// running=false; GemStoneAddition
- if(log.isTraceEnabled())
- log.trace("ConnectionTable.Connection.Sender thread terminated");
- }
- }
-
-
- }
-
-
- class Reaper implements Runnable {
- Thread t=null; // GemStoneAddition synchronize on this to access
-
- Reaper() {
- ;
- }
-
- public void start() {
- if(conns.size() == 0)
- return;
- synchronized (this) { // GemStoneAddition
-// if(t != null && !t.isAlive())
-// t=null; GemStoneAddition
- if(t == null || !t.isAlive()) {
- //RKU 7.4.2003, put in threadgroup
- t=new Thread(thread_group, this, "ConnectionTable.ReaperThread");
- t.setDaemon(true); // will allow us to terminate if all remaining threads are daemons
- t.start();
- }
- }
- }
-
- public void stop() {
- synchronized (this) { // GemStoneAddition
- if(t != null) {
- t.interrupt(); // GemStoneAddition
- t=null;
- }
- }
- }
-
-
- synchronized /* GemStoneAddition */ public boolean isRunning() {
- return t != null && t.isAlive() /* GemStoneAddition */;
- }
-
- public void run() {
- Connection value;
- Map.Entry entry;
- long curr_time;
-
- if(log.isInfoEnabled()) log.info("connection reaper thread was started. Number of connections=" +
- conns.size() + ", reaper_interval=" + reaper_interval + ", conn_expire_time=" +
- conn_expire_time);
-
- for (;;) { // GemStoneAddition remove coding anti-pattern
- // first sleep
-// if (conns.size() == 0) break; // GemStoneAddition but needs to be synchronized
- try { // GemStoneAddition
- Util.sleep(reaper_interval);
- }
- catch (InterruptedException e) {
- // Thread.currentThread().interrupt(); not needed; we're exiting
- break; // exit loop and thread
- }
- synchronized(conns) {
- if (conns.size() == 0) break; // GemStoneAddition
- curr_time=System.currentTimeMillis();
- for(Iterator it=conns.entrySet().iterator(); it.hasNext();) {
- entry=(Map.Entry)it.next();
- value=(Connection)entry.getValue();
- if(log.isInfoEnabled()) log.info("connection is " +
- ((curr_time - value.last_access) / 1000) + " seconds old (curr-time=" +
- curr_time + ", last_access=" + value.last_access + ')');
- if(value.last_access + conn_expire_time < curr_time) {
- if(log.isInfoEnabled()) log.info("connection " + value +
- " has been idle for too long (conn_expire_time=" + conn_expire_time +
- "), will be removed");
- value.destroy();
- it.remove();
- }
- }
- }
- }
- if(log.isInfoEnabled()) log.info(ExternalStrings.ConnectionTable_REAPER_TERMINATED);
-// t=null; GemStoneAddition
- }
- }
-
-
-}
-