You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/08/21 23:22:40 UTC

[16/51] [partial] incubator-geode git commit: GEODE-77 removing the old jgroups subproject

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/UDP_NIO.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/UDP_NIO.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/UDP_NIO.java
deleted file mode 100644
index bdcecba..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/UDP_NIO.java
+++ /dev/null
@@ -1,1567 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-package com.gemstone.org.jgroups.protocols;
-
-import com.gemstone.org.jgroups.util.GemFireTracer;
-import com.gemstone.org.jgroups.*;
-import com.gemstone.org.jgroups.stack.IpAddress;
-import com.gemstone.org.jgroups.stack.LogicalAddress;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.Queue;
-import com.gemstone.org.jgroups.util.QueueClosedException;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-/**
- * Multicast transport. Similar to UDP, but binds to multiple (or all) interfaces for sending and receiving
- * multicast and unicast traffic.<br/>
- * The list of interfaces can be set via a property (comma-delimited list of IP addresses or "all" for all
- * interfaces). Note that this class only works under JDK 1.4 and higher.<br/>
- * For each of the interfaces listed we create a Listener, which listens on the group multicast address and creates
- * a unicast datagram socket. The address of this member is determined at startup time, and is the host name plus
- * a timestamp (LogicalAddress). It does not change during the lifetime of the process. The LogicalAddress contains
- * a list of all unicast socket addresses to which we can send back unicast messages. When we send a message, the
- * Listener adds the sender's return address. When we receive a message, we add that address to our routing cache, which
- * contains logical addresses and physical addresses. When we need to send a unicast address, we first check whether
- * the logical address has a physical address associated with it in the cache. If so, we send a message to that address.
- * If not, we send the unicast message to <em>all</em> physical addresses contained in the LogicalAddress.<br/>
- * UDP_NIO guarantees that - in scenarios with multiple subnets and multi-homed machines - members do see each other.
- * There is some overhead in multicasting the same message on multiple interfaces, and potentially sending a unicast
- * on multiple interfaces as well, but the advantage is that we don't need stuff like bind_addr any longer. Plus,
- * the unicast routing caches should ensure that unicasts are only sent via 1 interface in almost all cases.
- * 
- * @author Bela Ban Oct 2003
- * @version $Id: UDP_NIO.java,v 1.4 2005/08/11 12:43:47 belaban Exp $
- */
-public class UDP_NIO extends Protocol implements Receiver {
-
-    static final String name="UDP_NIO";
-
-    /** Maintains a list of Connectors, one for each interface we're listening on */
-    ConnectorTable ct=null;
-
-    /** A List<String> of bind addresses, we create 1 Connector for each interface */
-    List bind_addrs=null;
-
-    /** The name of the group to which this member is connected */
-    String group_name=null;
-
-    /** The multicast address (mcast address and port) this member uses (default: 230.1.2.3:7500) */
-    InetSocketAddress mcast_addr=null;
-
-    /** The address of this member. Valid for the lifetime of the JVM in which this member runs */
-    LogicalAddress local_addr=new LogicalAddress(null, null);
-
-    /** Logical address without list of physical addresses */
-    LogicalAddress local_addr_canonical=local_addr.copy();
-
-    /** Pre-allocated byte stream. Used for serializing datagram packets */
-    ByteArrayOutputStream out_stream=new ByteArrayOutputStream(65535);
-
-    /**
-     * The port to which the unicast receiver socket binds.
-     * 0 means to bind to any (ephemeral) port
-     */
-    int local_bind_port=0;
-    int port_range=1; // 27-6-2003 bgooren, Only try one port by default
-
-
-    /**
-     * Whether to enable IP multicasting. If false, multiple unicast datagram
-     * packets are sent rather than one multicast packet
-     */
-    boolean ip_mcast=true;
-
-    /** The time-to-live (TTL) for multicast datagram packets */
-    int ip_ttl=32;
-
-    /** The members of this group (updated when a member joins or leaves) */
-    Vector members=new Vector();
-
-    /**
-     * Header to be added to all messages sent via this protocol. It is
-     * preallocated for efficiency
-     */
-    UdpHeader udp_hdr=null;
-
-    /** Send buffer size of the multicast datagram socket */
-    int mcast_send_buf_size=300000;
-
-    /** Receive buffer size of the multicast datagram socket */
-    int mcast_recv_buf_size=300000;
-
-    /** Send buffer size of the unicast datagram socket */
-    int ucast_send_buf_size=300000;
-
-    /** Receive buffer size of the unicast datagram socket */
-    int ucast_recv_buf_size=300000;
-
-    /**
-     * If true, messages sent to self are treated specially: unicast messages are
-     * looped back immediately, multicast messages get a local copy first and -
-     * when the real copy arrives - it will be discarded. Useful for Window
-     * media (non)sense
-     * @deprecated This is used by default now
-     */
-//    boolean loopback=true; //todo: remove GemStoneAddition(omitted)
-
-    /**
-     * Sometimes receivers are overloaded (they have to handle de-serialization etc).
-     * Packet handler is a separate thread taking care of de-serialization, receiver
-     * thread(s) simply put packet in queue and return immediately. Setting this to
-     * true adds one more thread
-     */
-    boolean use_packet_handler=false;
-
-    /** Used by packet handler to store incoming DatagramPackets */
-    Queue packet_queue=null;
-
-    /**
-     * If set it will be added to <tt>local_addr</tt>. Used to implement
-     * for example transport independent addresses
-     */
-    byte[] additional_data=null;
-
-    /**
-     * Dequeues DatagramPackets from packet_queue, unmarshalls them and
-     * calls <tt>handleIncomingUdpPacket()</tt>
-     */
-    PacketHandler packet_handler=null;
-
-
-    /** Number of bytes to allocate to receive a packet. Needs to be set to be higher than frag_size
-     * (handle CONFIG event)
-     */
-    static final int DEFAULT_RECEIVE_BUFFER_SIZE=120000;  // todo: make settable and/or use CONFIG event
-
-
-
-
-    /**
-     * Creates the UDP_NIO protocol, and initializes the
-     * state variables, does however not start any sockets or threads.
-     */
-    public UDP_NIO() {
-    }
-
-    /**
-     * debug only
-     */
-    @Override // GemStoneAddition  
-    public String toString() {
-        return "Protocol UDP(local address: " + local_addr + ')';
-    }
-
-
-    public void receive(DatagramPacket packet) {
-        int           len=packet.getLength();
-        byte[]        data=packet.getData();
-        SocketAddress sender=packet.getSocketAddress();
-
-        if(len == 4) {  // received a diagnostics probe
-            if(data[0] == 'd' && data[1] == 'i' && data[2] == 'a' && data[3] == 'g') {
-                handleDiagnosticProbe(sender);
-                return;
-            }
-        }
-
-        if(trace)
-            log.trace("received " + len + " bytes from " + sender);
-
-        if(use_packet_handler && packet_queue != null) {
-            byte[] tmp=new byte[len];
-            System.arraycopy(data, 0, tmp, 0, len);
-            try {
-                Object[] arr=new Object[]{tmp, sender};
-                packet_queue.add(arr);
-                return;
-            }
-            catch(QueueClosedException e) {
-                if(warn) log.warn("packet queue for packet handler thread is closed");
-                // pass through to handleIncomingPacket()
-            }
-        }
-
-        handleIncomingUdpPacket(data, sender);
-    }
-
-
-    /* ----------------------- Receiving of MCAST UDP packets ------------------------ */
-
-//    public void run() {
-//        DatagramPacket packet;
-//        byte receive_buf[]=new byte[65000];
-//        int len;
-//        byte[] tmp1, tmp2;
-//
-//        // moved out of loop to avoid excessive object creations (bela March 8 2001)
-//        packet=new DatagramPacket(receive_buf, receive_buf.length);
-//
-//        while(mcast_receiver != null && mcast_sock != null) {
-//            try {
-//                packet.setData(receive_buf, 0, receive_buf.length);
-//                mcast_sock.receive(packet);
-//                len=packet.getLength();
-//                if(len == 1 && packet.getData()[0] == 0) {
-//                    if(trace) if(log.isInfoEnabled()) log.info("UDP_NIO.run()", "received dummy packet");
-//                    continue;
-//                }
-//
-//                if(len == 4) {  // received a diagnostics probe
-//                    byte[] tmp=packet.getData();
-//                    if(tmp[0] == 'd' && tmp[1] == 'i' && tmp[2] == 'a' && tmp[3] == 'g') {
-//                        handleDiagnosticProbe(null, null);
-//                        continue;
-//                    }
-//                }
-//
-//                if(trace)
-//                    if(log.isInfoEnabled()) log.info("UDP_NIO.receive()", "received (mcast) " + packet.getLength() + " bytes from " +
-//                            packet.getAddress() + ":" + packet.getPort() + " (size=" + len + " bytes)");
-//                if(len > receive_buf.length) {
-//                    if(log.isErrorEnabled()) log.error("UDP_NIO.run()", "size of the received packet (" + len + ") is bigger than " +
-//                            "allocated buffer (" + receive_buf.length + "): will not be able to handle packet. " +
-//                            "Use the FRAG protocol and make its frag_size lower than " + receive_buf.length);
-//                }
-//
-//                if(Version.compareTo(packet.getData()) == false) {
-//                    if(warn) log.warn("UDP_NIO.run()",
-//                            "packet from " + packet.getAddress() + ":" + packet.getPort() +
-//                            " has different version (" +
-//                            Version.printVersionId(packet.getData(), Version.version_id.length) +
-//                            ") from ours (" + Version.printVersionId(Version.version_id) +
-//                            "). This may cause problems");
-//                }
-//
-//                if(use_packet_handler) {
-//                    tmp1=packet.getData();
-//                    tmp2=new byte[len];
-//                    System.arraycopy(tmp1, 0, tmp2, 0, len);
-//                    packet_queue.add(tmp2);
-//                } else
-//                    handleIncomingUdpPacket(packet.getData());
-//            } catch(SocketException sock_ex) {
-//                 if(log.isInfoEnabled()) log.info("UDP_NIO.run()", "multicast socket is closed, exception=" + sock_ex);
-//                break;
-//            } catch(InterruptedIOException io_ex) { // thread was interrupted
-//                ; // go back to top of loop, where we will terminate loop
-//            } catch(Throwable ex) {
-//                if(log.isErrorEnabled()) log.error("UDP_NIO.run()", "exception=" + ex + ", stack trace=" + Util.printStackTrace(ex));
-//                Util.sleep(1000); // so we don't get into 100% cpu spinning (should NEVER happen !)
-//            }
-//        }
-//         if(log.isInfoEnabled()) log.info("UDP_NIO.run()", "multicast thread terminated");
-//    }
-
-    void handleDiagnosticProbe(SocketAddress sender) {
-        try {
-            byte[] diag_rsp=getDiagResponse().getBytes();
-            DatagramPacket rsp=new DatagramPacket(diag_rsp, 0, diag_rsp.length, sender);
-
-                if(log.isInfoEnabled()) log.info(ExternalStrings.UDP_NIO_SENDING_DIAG_RESPONSE_TO__0, sender);
-            ct.send(rsp);
-        } 
-        catch(Exception t) {
-            if(log.isErrorEnabled()) log.error(ExternalStrings.UDP_NIO_FAILED_SENDING_DIAG_RSP_TO__0__EXCEPTION_1, new Object[] {sender, t});
-        }
-    }
-
-    String getDiagResponse() {
-        StringBuffer sb=new StringBuffer();
-        sb.append(local_addr).append(" (").append(group_name).append(')');
-        sb.append(" [").append(mcast_addr).append("]\n");
-        sb.append("Version=").append(JGroupsVersion.description).append(", cvs=\"").append(JGroupsVersion.cvs).append("\"\n");
-        sb.append("physical addresses: ").append(local_addr.getPhysicalAddresses()).append('\n');
-        sb.append("members: ").append(members).append('\n');
-
-        return sb.toString();
-    }
-
-    /* ------------------------------------------------------------------------------- */
-
-
-
-    /*------------------------------ Protocol interface ------------------------------ */
-
-    @Override // GemStoneAddition  
-    public String getName() {
-        return name;
-    }
-
-
-    @Override // GemStoneAddition  
-    public void init() throws Exception {
-        if(use_packet_handler) {
-            packet_queue=new Queue();
-            packet_handler=new PacketHandler();
-        }
-    }
-
-
-    /**
-     * Creates the unicast and multicast sockets and starts the unicast and multicast receiver threads
-     */
-    @Override // GemStoneAddition  
-    public void start() throws Exception {
-         if(log.isInfoEnabled()) log.info(ExternalStrings.UDP_NIO_CREATING_SOCKETS_AND_STARTING_THREADS);
-        if(ct == null) {
-            ct=new ConnectorTable(mcast_addr, DEFAULT_RECEIVE_BUFFER_SIZE, mcast_recv_buf_size, ip_mcast, this);
-
-            for(Iterator it=bind_addrs.iterator(); it.hasNext();) {
-                String bind_addr=(String)it.next();
-                ct.listenOn(bind_addr, local_bind_port, port_range, DEFAULT_RECEIVE_BUFFER_SIZE, ucast_recv_buf_size,
-                        ucast_send_buf_size, ip_ttl, this);
-            }
-
-            // add physical addresses to local_addr
-            List physical_addrs=ct.getConnectorAddresses(); // must be non-null and size() >= 1
-            for(Iterator it=physical_addrs.iterator(); it.hasNext();) {
-                SocketAddress address=(SocketAddress)it.next();
-                local_addr.addPhysicalAddress(address);
-            }
-
-            if(additional_data != null)
-                local_addr.setAdditionalData(additional_data);
-
-            ct.start();
-
-            passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
-            if(use_packet_handler)
-            packet_handler.start();
-        }
-    }
-
-
-    @Override // GemStoneAddition  
-    public void stop() {
-         if(log.isInfoEnabled()) log.info(ExternalStrings.UDP_NIO_CLOSING_SOCKETS_AND_STOPPING_THREADS);
-        if(packet_handler != null)
-            packet_handler.stop();
-        if(ct != null) {
-            ct.stop();
-            ct=null;
-        }
-        local_addr.removeAllPhysicalAddresses();
-    }
-
-
-    /**
-     * Setup the Protocol instance acording to the configuration string.
-     * The following properties are being read by the UDP protocol:
-     * <ul>
-     * <li> param mcast_addr - the multicast address to use default is 224.0.0.200
-     * <li> param mcast_port - (int) the port that the multicast is sent on default is 7500
-     * <li> param ip_mcast - (boolean) flag whether to use IP multicast - default is true
-     * <li> param ip_ttl - Set the default time-to-live for multicast packets sent out on this socket. default is 32
-     * </ul>
-     * @return true if no other properties are left.
-     *         false if the properties still have data in them, ie ,
-     *         properties are left over and not handled by the protocol stack
-     */
-    @Override // GemStoneAddition  
-    public boolean setProperties(Properties props) {
-        String  str;
-        List    exclude_list=null;
-        String  mcast_addr_name="230.8.8.8";
-        int     mcast_port=7500;
-
-        super.setProperties(props);
-        str=props.getProperty("bind_addrs");
-        if(str != null) {
-            str=str.trim();
-            if("all".equals(str.toLowerCase())) {
-                try {
-                    bind_addrs=determineAllBindInterfaces();
-                }
-                catch(SocketException e) {
-                    e.printStackTrace();
-                    bind_addrs=null;
-                }
-            }
-            else {
-                bind_addrs=Util.parseCommaDelimitedStrings(str);
-            }
-            props.remove("bind_addrs");
-        }
-
-        str=props.getProperty("bind_addrs_exclude");
-        if(str != null) {
-            str=str.trim();
-            exclude_list=Util.parseCommaDelimitedStrings(str);
-            props.remove("bind_addrs_exclude");
-        }
-
-        str=props.getProperty("bind_port");
-        if(str != null) {
-            local_bind_port=Integer.parseInt(str);
-            props.remove("bind_port");
-        }
-
-        str=props.getProperty("start_port");
-        if(str != null) {
-            local_bind_port=Integer.parseInt(str);
-            props.remove("start_port");
-        }
-
-        str=props.getProperty("port_range");
-        if(str != null) {
-            port_range=Integer.parseInt(str);
-            props.remove("port_range");
-        }
-
-        str=props.getProperty("mcast_addr");
-        if(str != null) {
-            mcast_addr_name=str;
-            props.remove("mcast_addr");
-        }
-
-        str=props.getProperty("mcast_port");
-        if(str != null) {
-            mcast_port=Integer.parseInt(str);
-            props.remove("mcast_port");
-        }
-
-        str=props.getProperty("ip_mcast");
-        if(str != null) {
-            ip_mcast=Boolean.valueOf(str).booleanValue();
-            props.remove("ip_mcast");
-        }
-
-        str=props.getProperty("ip_ttl");
-        if(str != null) {
-            ip_ttl=Integer.parseInt(str);
-            props.remove("ip_ttl");
-        }
-
-        str=props.getProperty("mcast_send_buf_size");
-        if(str != null) {
-            mcast_send_buf_size=Integer.parseInt(str);
-            props.remove("mcast_send_buf_size");
-        }
-
-        str=props.getProperty("mcast_recv_buf_size");
-        if(str != null) {
-            mcast_recv_buf_size=Integer.parseInt(str);
-            props.remove("mcast_recv_buf_size");
-        }
-
-        str=props.getProperty("ucast_send_buf_size");
-        if(str != null) {
-            ucast_send_buf_size=Integer.parseInt(str);
-            props.remove("ucast_send_buf_size");
-        }
-
-        str=props.getProperty("ucast_recv_buf_size");
-        if(str != null) {
-            ucast_recv_buf_size=Integer.parseInt(str);
-            props.remove("ucast_recv_buf_size");
-        }
-
-        str=props.getProperty("use_packet_handler");
-        if(str != null) {
-            use_packet_handler=Boolean.valueOf(str).booleanValue();
-            props.remove("use_packet_handler");
-        }
-
-
-        // determine mcast_addr
-        mcast_addr=new InetSocketAddress(mcast_addr_name, mcast_port);
-
-        // handling of bind_addrs
-        if(bind_addrs == null)
-            bind_addrs=new ArrayList();
-        if(bind_addrs.size() == 0) {
-            try {
-                String default_bind_addr=determineDefaultBindInterface();
-                bind_addrs.add(default_bind_addr);
-            }
-            catch(SocketException ex) {
-                if(log.isErrorEnabled()) log.error(ExternalStrings.UDP_NIO_FAILED_DETERMINING_THE_DEFAULT_BIND_INTERFACE__0, ex);
-            }
-        }
-        if(exclude_list != null) {
-            bind_addrs.removeAll(exclude_list);
-        }
-        if(bind_addrs.size() == 0) {
-            if(log.isErrorEnabled()) log.error(ExternalStrings.UDP_NIO_NO_VALID_BIND_INTERFACE_FOUND_UNABLE_TO_LISTEN_FOR_NETWORK_TRAFFIC);
-            return false;
-        }
-        else {
-
-                if(log.isInfoEnabled()) log.info(ExternalStrings.UDP_NIO_BIND_INTERFACES_ARE__0, bind_addrs);
-        }
-
-        if(props.size() > 0) {
-            log.error(ExternalStrings.UDP_NIO_UDP_NIOSETPROPERTIES_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
-
-            return false;
-        }
-        return true;
-    }
-
-
-    /**
-     * This prevents the up-handler thread to be created, which essentially is superfluous:
-     * messages are received from the network rather than from a layer below.
-     * DON'T REMOVE ! 
-     */
-    @Override // GemStoneAddition  
-    public void startUpHandler() {
-    }
-
-    /**
-     * handle the UP event.
-     * 
-     * @param evt - the event being send from the stack
-     */
-    @Override // GemStoneAddition  
-    public void up(Event evt) {
-        passUp(evt);
-
-        switch(evt.getType()) {
-
-            case Event.CONFIG:
-                passUp(evt);
-                if(log.isInfoEnabled()) log.info(ExternalStrings.UDP_NIO_RECEIVED_CONFIG_EVENT__0, (Object)evt.getArg());
-                handleConfigEvent((HashMap)evt.getArg());
-                return;
-        }
-
-        passUp(evt);
-    }
-
-    /**
-     * Caller by the layer above this layer. Usually we just put this Message
-     * into the send queue and let one or more worker threads handle it. A worker thread
-     * then removes the Message from the send queue, performs a conversion and adds the
-     * modified Message to the send queue of the layer below it, by calling Down).
-     */
-    @Override // GemStoneAddition  
-    public void down(Event evt) {
-        Message msg;
-        Object dest_addr;
-
-        if(evt.getType() != Event.MSG) {  // unless it is a message handle it and respond
-            handleDownEvent(evt);
-            return;
-        }
-
-        msg=(Message)evt.getArg();
-
-        if(udp_hdr != null && udp_hdr.channel_name != null) {
-            // added patch by Roland Kurmann (March 20 2003)
-            msg.putHeader(name, udp_hdr);
-        }
-
-        dest_addr=msg.getDest();
-
-        // Because we don't call Protocol.passDown(), we notify the observer directly (e.g. PerfObserver).
-        // This way, we still have performance numbers for UDP
-        if(observer != null)
-            observer.passDown(evt);
-
-        if(dest_addr == null) { // 'null' means send to all group members
-            if(ip_mcast == false) {
-                //sends a separate UDP message to each address
-                sendMultipleUdpMessages(msg, members);
-                return;
-            }
-        }
-
-        try {
-            sendUdpMessage(msg); // either unicast (dest != null) or multicast (dest == null)
-        }
-        catch(Exception e) {
-            if(log.isErrorEnabled()) log.error(ExternalStrings.UDP_NIO_EXCEPTION_0__MSG_1__MCAST_ADDR_2, new Object[] {e, msg, mcast_addr});
-        }
-    }
-
-
-
-
-
-
-    /*--------------------------- End of Protocol interface -------------------------- */
-
-
-    /* ------------------------------ Private Methods -------------------------------- */
-
-
-    void handleMessage(Message msg) {
-
-    }
-
-
-    /**
-     * Processes a packet read from either the multicast or unicast socket. Needs to be synchronized because
-     * mcast or unicast socket reads can be concurrent
-     */
-    void handleIncomingUdpPacket(byte[] data, SocketAddress sender) {
-        ByteArrayInputStream inp_stream;
-        ObjectInputStream    inp;
-        Message              msg=null;
-        UdpHeader            hdr=null;
-        Event                evt;
-        Address              dst, src;
-        short                version;
-
-        try {
-            // skip the first n bytes (default: 4), this is the version info
-            inp_stream=new ByteArrayInputStream(data);
-            inp=new ObjectInputStream(inp_stream);
-            version=inp.readShort();
-
-            if(JGroupsVersion.compareTo(version) == false) {
-                if(warn)
-                    log.warn("packet from " + sender + " has different version (" + version +
-                               ") from ours (" + JGroupsVersion.version + "). This may cause problems");
-            }
-
-            msg=new Message();
-            msg.readExternal(inp);
-            dst=msg.getDest();
-            src=msg.getSrc();
-            if(src == null) {
-                if(log.isErrorEnabled()) log.error(ExternalStrings.UDP_NIO_SENDERS_ADDRESS_IS_NULL);
-            }
-            else {
-                ((LogicalAddress)src).setPrimaryPhysicalAddress(sender);
-            }
-
-            // discard my own multicast loopback copy
-            if((dst == null || dst.isMulticastAddress()) && src != null && local_addr.equals(src)) {
-                if(trace)
-                    log.trace("discarded own loopback multicast packet");
-
-                // System.out.println("-- discarded " + msg.getObject());
-
-                return;
-            }
-
-            evt=new Event(Event.MSG, msg);
-            if(trace)
-                log.trace("Message is " + msg + ", headers are " + msg.getHeaders());
-
-            /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.
-             * This allows e.g. PerfObserver to get the time of reception of a message */
-            if(observer != null)
-                observer.up(evt, up_queue.size());
-
-            hdr=(UdpHeader)msg.removeHeader(name);
-        } 
-        catch(Exception e) {
-            if(log.isErrorEnabled()) log.error(ExternalStrings.UDP_NIO_EXCEPTION_0, Util.getStackTrace(e));
-            return;
-        }
-
-        if(hdr != null) {
-
-            /* Discard all messages destined for a channel with a different name */
-            String ch_name=null;
-
-            if(hdr.channel_name != null)
-                ch_name=hdr.channel_name;
-
-            // Discard if message's group name is not the same as our group name unless the
-            // message is a diagnosis message (special group name DIAG_GROUP)
-            // GemStoneAddition: now we support a different group name by
-            // transparently adjusting serialization and semantics for older
-            // versions till the min specified in Version.OLDEST_P2P_SUPPORTED
-            if(ch_name != null && group_name != null && !group_name.equals(ch_name) &&
-                    !ch_name.equals(Util.DIAG_GROUP)) {
-                    if(warn) log.warn("discarded message from different group (" +
-                            ch_name + "). Sender was " + msg.getSrc());
-                return;
-            }
-        }
-
-        passUp(evt);
-    }
-
-
-    /**
-     * Send a message to the address specified in dest
-     */
-    void sendUdpMessage(Message msg) throws Exception {
-        Address            dest, src;
-        ObjectOutputStream out;
-        byte               buf[];
-        DatagramPacket     packet;
-        Message            copy;
-        Event              evt; // for loopback messages
-
-        dest=msg.getDest();  // if non-null: unicast, else multicast
-        src=msg.getSrc();
-        if(src == null) {
-            src=local_addr_canonical; // no physical addresses present
-            msg.setSrc(src);
-        }
-
-        if(trace)
-            log.trace("sending message to " + msg.getDest() +
-                    " (src=" + msg.getSrc() + "), headers are " + msg.getHeaders());
-
-        // Don't send if destination is local address. Instead, switch dst and src and put in up_queue.
-        // If multicast message, loopback a copy directly to us (but still multicast). Once we receive this,
-        // we will discard our own multicast message
-        if(dest == null || dest.isMulticastAddress() || dest.equals(local_addr)) {
-            copy=msg.copy();
-            copy.removeHeader(name);
-            evt=new Event(Event.MSG, copy);
-
-            /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.
-               This allows e.g. PerfObserver to get the time of reception of a message */
-            if(observer != null)
-                observer.up(evt, up_queue.size());
-            if(trace) log.trace("looped back local message " + copy);
-
-            // System.out.println("\n-- passing up packet id=" + copy.getObject());
-            passUp(evt);
-            // System.out.println("-- passed up packet id=" + copy.getObject());
-
-            if(dest != null && !dest.isMulticastAddress())
-                return; // it is a unicast message to myself, no need to put on the network
-        }
-
-        out_stream.reset();
-        out=new ObjectOutputStream(out_stream);
-        out.writeShort(JGroupsVersion.version);
-        msg.writeExternal(out);
-        out.flush(); // needed if out buffers its output to out_stream
-        buf=out_stream.toByteArray();
-        packet=new DatagramPacket(buf, buf.length, mcast_addr);
-
-        //System.out.println("-- sleeping 4 secs");
-        // Thread.sleep(4000);
-
-
-        // System.out.println("\n-- sending packet " + msg.getObject());
-        ct.send(packet);
-        // System.out.println("-- sent " + msg.getObject());
-    }
-
-
-    void sendMultipleUdpMessages(Message msg, Vector dests) {
-        Address dest;
-
-        for(int i=0; i < dests.size(); i++) {
-            dest=(Address)dests.elementAt(i);
-            msg.setDest(dest);
-
-            try {
-                sendUdpMessage(msg);
-            }
-            catch(Exception e) {
-                if(log.isDebugEnabled()) log.debug("exception=" + e);
-            }
-        }
-    }
-
-
-
-
-
-//
-//    /**
-//     * Workaround for the problem encountered in certains JDKs that a thread listening on a socket
-//     * cannot be interrupted. Therefore we just send a dummy datagram packet so that the thread 'wakes up'
-//     * and realizes it has to terminate. Should be removed when all JDKs support Thread.interrupt() on
-//     * reads. Uses send_sock t send dummy packet, so this socket has to be still alive.
-//     *
-//     * @param dest The destination host. Will be local host if null
-//     * @param port The destination port
-//     */
-//    void sendDummyPacket(InetAddress dest, int port) {
-//        DatagramPacket packet;
-//        byte[] buf={0};
-//
-//        if(dest == null) {
-//            try {
-//                dest=InetAddress.getLocalHost();
-//            } catch(Exception e) {
-//            }
-//        }
-//
-//        if(trace) if(log.isInfoEnabled()) log.info("UDP_NIO.sendDummyPacket()", "sending packet to " + dest + ":" + port);
-//
-//        if(ucast_sock == null || dest == null) {
-//            if(warn) log.warn("UDP_NIO.sendDummyPacket()", "send_sock was null or dest was null, cannot send dummy packet");
-//            return;
-//        }
-//        packet=new DatagramPacket(buf, buf.length, dest, port);
-//        try {
-//            ucast_sock.send(packet);
-//        } catch(Throwable e) {
-//            if(log.isErrorEnabled()) log.error("UDP_NIO.sendDummyPacket()", "exception sending dummy packet to " +
-//                    dest + ":" + port + ": " + e);
-//        }
-//    }
-
-
-
-
-
-    void handleDownEvent(Event evt) {
-        switch(evt.getType()) {
-
-            case Event.TMP_VIEW:
-            case Event.VIEW_CHANGE:
-                synchronized(members) {
-                    members.removeAllElements();
-                    Vector tmpvec=((View)evt.getArg()).getMembers();
-                    for(int i=0; i < tmpvec.size(); i++)
-                        members.addElement(tmpvec.elementAt(i));
-                }
-                break;
-
-            case Event.GET_LOCAL_ADDRESS:   // return local address -> Event(SET_LOCAL_ADDRESS, local)
-                passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
-                break;
-
-            case Event.CONNECT:
-                group_name=(String)evt.getArg();
-                udp_hdr=new UdpHeader(group_name);
-
-                // removed March 18 2003 (bela), not needed (handled by GMS)
-                // changed July 2 2003 (bela): we discard CONNECT_OK at the GMS level anyway, this might
-                // be needed if we run without GMS though
-                passUp(new Event(Event.CONNECT_OK));
-                break;
-
-            case Event.DISCONNECT:
-                passUp(new Event(Event.DISCONNECT_OK));
-                break;
-
-            case Event.CONFIG:
-                if(log.isInfoEnabled()) log.info(ExternalStrings.UDP_NIO_RECEIVED_CONFIG_EVENT__0, (Object)evt.getArg());
-                handleConfigEvent((HashMap)evt.getArg());
-                break;
-        }
-    }
-
-
-    void handleConfigEvent(HashMap map) {
-        if(map == null) return;
-        if(map.containsKey("additional_data"))
-            additional_data=(byte[])map.get("additional_data");
-        if(map.containsKey("send_buf_size")) {
-            mcast_send_buf_size=((Integer)map.get("send_buf_size")).intValue();
-            ucast_send_buf_size=mcast_send_buf_size;
-        }
-        if(map.containsKey("recv_buf_size")) {
-            mcast_recv_buf_size=((Integer)map.get("recv_buf_size")).intValue();
-            ucast_recv_buf_size=mcast_recv_buf_size;
-        }
-    }
-
-
-    /** Return the first non-loopback interface */
-    public String determineDefaultBindInterface() throws SocketException {
-        for(Enumeration en=NetworkInterface.getNetworkInterfaces(); en.hasMoreElements();) {
-            NetworkInterface ni=(NetworkInterface)en.nextElement();
-            for(Enumeration en2=ni.getInetAddresses(); en2.hasMoreElements();) {
-                InetAddress bind_addr=(InetAddress)en2.nextElement();
-                if(!bind_addr.isLoopbackAddress()) {
-                    return bind_addr.getHostAddress();
-                }
-            }
-        }
-        return null;
-    }
-
-    public List determineAllBindInterfaces() throws SocketException {
-        List ret=new ArrayList();
-        for(Enumeration en=NetworkInterface.getNetworkInterfaces(); en.hasMoreElements();) {
-            NetworkInterface ni=(NetworkInterface)en.nextElement();
-            for(Enumeration en2=ni.getInetAddresses(); en2.hasMoreElements();) {
-                InetAddress bind_addr=(InetAddress)en2.nextElement();
-                ret.add(bind_addr.getHostAddress());
-            }
-        }
-
-        return ret;
-    }
-
-    /* ----------------------------- End of Private Methods ---------------------------------------- */
-
-
-
-    /* ----------------------------- Inner Classes ---------------------------------------- */
-
-
-    /**
-     * This thread fetches byte buffers from the packet_queue, converts them into messages and passes them up
-     * to the higher layer (done in handleIncomingUdpPacket()).
-     */
-    class PacketHandler implements Runnable {
-        Thread t=null;
-
-        public void run() {
-            byte[] data;
-            SocketAddress sender;
-
-            while(packet_queue != null && packet_handler != null) {
-                try {
-                    Object[] arr=(Object[])packet_queue.remove();
-                    data=(byte[])arr[0];
-                    sender=(SocketAddress)arr[1];
-                } catch (InterruptedException ie) { // GemStoneAddition
-                     if(log.isInfoEnabled()) log.info(ExternalStrings.UDP_NIO_PACKET_HANDLER_THREAD_TERMINATING);
-                    break; // exit loop and thread
-                } catch(QueueClosedException closed_ex) {
-                     if(log.isInfoEnabled()) log.info(ExternalStrings.UDP_NIO_PACKET_HANDLER_THREAD_TERMINATING);
-                    break;
-                }
-                handleIncomingUdpPacket(data, sender);
-                data=null; // let's give the poor garbage collector a hand...
-            }
-        }
-
-        void start() {
-            if(t == null) {
-                t=new Thread(this, "UDP_NIO.PacketHandler thread");
-                t.setDaemon(true);
-                t.start();
-            }
-        }
-
-        void stop() {
-            if(packet_queue != null)
-                packet_queue.close(false); // should terminate the packet_handler thread too
-            t=null;
-            packet_queue=null;
-        }
-    }
-
-
-
-
-    /**
-     * Manages a multicast and unicast socket on a given interface (NIC). The multicast socket is used
-     * to listen for incoming multicast packets, the unicast socket is used to (1) listen for incoming
-     * unicast packets, (2) to send unicast packets and (3) to send multicast packets
-     */
-    public static class Connector implements Runnable {
-
-        protected Thread t=null; // GemStoneAddition -- accesses synchronized on this
-
-        protected SenderThread sender_thread=null;
-
-        /** Interface on which ucast_sock and mcast_sender_sock are created */
-        NetworkInterface bind_interface;
-
-
-        /** Used for sending/receiving unicast/multicast packets. The reason we have to use a MulticastSocket versus a
-         * DatagramSocket is that only MulticastSockets allow to set the interface over which a multicast
-         * is sent: DatagramSockets consult the routing table to find the interface
-         */
-        MulticastSocket mcast_sock=null;
-
-        /** Local port of the mcast_sock */
-        SocketAddress localAddr=null;
-
-        /** The receiver which handles incoming packets */
-        Receiver receiver=null;
-
-        /** Buffer for incoming unicast packets */
-        protected byte[] receive_buffer=null;
-
-
-        Queue send_queue=new Queue();
-
-        static final GemFireTracer mylog=GemFireTracer.getLog(Connector.class);
-        static final boolean mywarn=mylog.isWarnEnabled();
-
-
-        class SenderThread extends Thread  {
-
-
-          @Override // GemStoneAddition  
-            public void run() {
-                Object[] arr;
-                byte[] buf;
-                SocketAddress dest;
-
-                while(send_queue != null) {
-                    try {
-                        arr=(Object[])send_queue.remove();
-                        buf=(byte[])arr[0];
-                        dest=(SocketAddress)arr[1];
-                        mcast_sock.send(new DatagramPacket(buf, buf.length, dest));
-                    }
-                    catch(QueueClosedException e) {
-                        break;
-                    }
-                    catch (InterruptedException ie) { // GemStoneAddition
-                        break; // exit loop and thread
-                    }
-                    catch(SocketException e) {
-                        e.printStackTrace();
-                    }
-                    catch(IOException e) {
-                        e.printStackTrace();
-                    }
-
-                }
-            }
-        }
-
-
-
-        public Connector(NetworkInterface bind_interface, int local_bind_port,
-                         int port_range,  int receive_buffer_size,
-                         int receive_sock_buf_size, int send_sock_buf_size,
-                         int ip_ttl, Receiver receiver) throws IOException {
-            this.bind_interface=bind_interface;
-            this.receiver=receiver;
-            this.receive_buffer=new byte[receive_buffer_size];
-
-            mcast_sock=createMulticastSocket(local_bind_port, port_range);
-
-            // changed Bela Dec 31 2003: if loopback is disabled other members on the same machine won't be able
-            // to receive our multicasts
-            // mcast_sock.setLoopbackMode(true); // we don't want to receive our own multicasts
-            mcast_sock.setReceiveBufferSize(receive_sock_buf_size);
-            mcast_sock.setSendBufferSize(send_sock_buf_size);
-            mcast_sock.setTimeToLive(ip_ttl);
-            System.out.println("ttl=" + mcast_sock.getTimeToLive());
-            mcast_sock.setNetworkInterface(this.bind_interface); // for outgoing multicasts
-            localAddr=mcast_sock.getLocalSocketAddress();
-            System.out.println("-- local_addr=" + localAddr);
-            System.out.println("-- mcast_sock: send_bufsize=" + mcast_sock.getSendBufferSize() +
-                    ", recv_bufsize=" + mcast_sock.getReceiveBufferSize());
-        }
-
-
-        public SocketAddress getLocalAddress() {
-            return localAddr;
-        }
-
-        public NetworkInterface getBindInterface() {
-            return bind_interface;
-        }
-
-        public void start() throws Exception {
-            if(mcast_sock == null)
-                throw new Exception("UDP_NIO.Connector.start(): connector has been stopped (start() cannot be called)");
-
-            synchronized (this) { // GemStoneAddition
-            if(t != null && t.isAlive()) {
-                if(mywarn) mylog.warn("connector thread is already running");
-                return;
-            }
-            t=new Thread(this, "ConnectorThread for " + localAddr);
-            }
-            
-            t.setDaemon(true);
-            t.start();
-
-            sender_thread=new SenderThread();
-            sender_thread.start();
-        }
-
-        /** Stops the connector. After this call, start() cannot be called, but a new connector has to
-         * be created
-         */
-        public void stop() {
-          // GemStoneAddition -- interrupt thread first before shutting
-          // down its socket, so that it will know that shutdown is occurring.
-            synchronized (this) { // GemStoneAddition
-              if (t != null) { // GemStoneAddition
-                t.interrupt();
-              }
-              t = null;
-            }
-            if(mcast_sock != null)
-              mcast_sock.close(); // terminates the thread if running
-            mcast_sock=null;
-        }
-
-
-
-        /** Sends a message using mcast_sock */
-        public void send(DatagramPacket packet) throws Exception {
-            //mcast_sock.send(packet);
-
-            byte[] buf=packet.getData().clone();
-            Object[] arr=new Object[]{buf, packet.getSocketAddress()};
-            send_queue.add(arr);
-        }
-
-        public void run() {
-            DatagramPacket packet=new DatagramPacket(receive_buffer, receive_buffer.length);
-            for (;;) { // GemStoneAddition remove variable anti-pattern
-              if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition
-                try {
-                    packet.setData(receive_buffer, 0, receive_buffer.length);
-                    ConnectorTable.receivePacket(packet, mcast_sock, receiver);
-                }
-                catch(Exception th) {
-                  if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition
-//                    if(th == null || mcast_sock == null || mcast_sock.isClosed())
-//                        break;
-                    if(mylog.isErrorEnabled()) mylog.error(ExternalStrings.UDP_NIO__0__EXCEPTION_1, new Object[] {localAddr, th});
-                    try { // GemStoneAddition
-                    Util.sleep(300); // so we don't get into 100% cpu spinning (should NEVER happen !)
-                    }
-                    catch (InterruptedException e) {
-                      break; // exit loop and thread
-                    }
-                }
-            }
-            t=null;
-        }
-
-
-
-
-        @Override // GemStoneAddition  
-        public String toString() {
-            StringBuffer sb=new StringBuffer();
-            sb.append("local_addr=").append(localAddr).append(", mcast_group=");
-            return sb.toString();
-        }
-
-
-
-
-
-        // 27-6-2003 bgooren, find available port in range (start_port, start_port+port_range)
-        private MulticastSocket createMulticastSocket(int local_bind_port, int port_range) throws IOException {
-            MulticastSocket sock=null;
-            int             tmp_port=local_bind_port;
-
-            int max_port=tmp_port + port_range;
-            while(tmp_port <= max_port) {
-                try {
-                    sock=new MulticastSocket(tmp_port);
-                    break;
-                }
-                catch(Exception bind_ex) {
-                    tmp_port++;
-                }
-            }
-            if(sock == null)
-                throw new IOException("could not create a MulticastSocket (port range: " + local_bind_port +
-                        " - " + (local_bind_port+port_range));
-            return sock;
-        }
-    }
-
-
-
-
-
-    /** Manages a bunch of Connectors */
-    public static class ConnectorTable implements Receiver, Runnable {
-
-        Thread t=null; // GemStoneAddition -- accesses synchronized on this
-
-        /** Socket to receive multicast packets. Will be bound to n interfaces */
-        MulticastSocket mcast_sock=null;
-
-        /** The multicast address which mcast_sock will join (e.g. 230.1.2.3:7500) */
-        InetSocketAddress mcastAddr=null;
-
-        Receiver receiver=null;
-
-        /** Buffer for incoming packets */
-        byte[] receive_buffer=null;
-
-        /** Vector<Connector>. A list of Connectors, one for each interface we listen on */
-        Vector connectors=new Vector();
-
-//        boolean running=false; GemStoneAddition non-volatile part of a coding anti-pattern
-
-        static final GemFireTracer mylog=GemFireTracer.getLog(ConnectorTable.class);
-        static final boolean mywarn=mylog.isWarnEnabled();
-
-
-
-
-        public ConnectorTable(InetSocketAddress mcast_addr,
-                              int receive_buffer_size, int receive_sock_buf_size,
-                              boolean ip_mcast, Receiver receiver) throws IOException {
-            this.receiver=receiver;
-            this.mcastAddr=mcast_addr;
-            this.receive_buffer=new byte[receive_buffer_size];
-
-            if(ip_mcast) {
-                mcast_sock=new MulticastSocket(mcast_addr.getPort());
-                // changed Bela Dec 31 2003: if loopback is disabled other members on the same machine won't be able
-                // to receive our multicasts
-                // mcast_sock.setLoopbackMode(true); // do not get own multicasts
-                mcast_sock.setReceiveBufferSize(receive_sock_buf_size);
-            }
-        }
-
-
-        public Receiver getReceiver() {
-            return receiver;
-        }
-
-        public void setReceiver(Receiver receiver) {
-            this.receiver=receiver;
-        }
-
-
-        /** Get all interfaces, create one Connector per interface and call start() on it */
-        public void start() throws Exception {
-            Connector tmp;
-//            if(running) GemStoneAddition
-//                return;
-
-            if(mcast_sock != null) {
-                // Start the thread servicing the incoming multicasts
-              synchronized (this) { // GemStoneAddition
-                if (t == null || !t.isAlive()) { // GemStoneAddition
-                  t=new Thread(this, "ConnectorTable thread");
-                  t.setDaemon(true);
-                  t.start();
-                }
-              }
-            }
-
-
-            // Start all Connectors
-            for(Iterator it=connectors.iterator(); it.hasNext();) {
-                tmp=(Connector)it.next();
-                tmp.start();
-            }
-
-//            running=true; GemStoneAddition
-        }
-
-
-        public void stop() {
-            Connector tmp;
-            for(Iterator it=connectors.iterator(); it.hasNext();) {
-                tmp=(Connector)it.next();
-                tmp.stop();
-            }
-            connectors.clear();
-            synchronized (this) { // GemStoneAddition
-              if (t != null) t.interrupt(); // GemStoneAddition
-              t=null;
-            }
-            if(mcast_sock != null) {
-                mcast_sock.close();
-//                mcast_sock=null; leave open, avoid NPE GemStoneAddition
-            }
-//            running=false; GemStoneAddition
-        }
-
-
-        public void run() {
-            // receive mcast packets on any interface of the list of interfaces we're listening on
-            DatagramPacket p=new DatagramPacket(receive_buffer, receive_buffer.length);
-            for (;;) { // GemStoneAddition -- avoid anti-pattern
-//              if (mcast_sock.isClosed()) break; // GemStoneAddition - but just let receivePacket fail, it's cheaper
-              if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition -- for safety
-                p.setData(receive_buffer, 0, receive_buffer.length);
-                try {
-                    receivePacket(p, mcast_sock, this);
-                }
-                catch(Exception th) {
-                  if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition
-//                    if(th == null || mcast_sock == null || mcast_sock.isClosed())
-//                        break;
-                    if(mylog.isErrorEnabled()) mylog.error(ExternalStrings.UDP_NIO_EXCEPTION_0, th);
-                    try { // GemStoneAddition
-                      Util.sleep(300); // so we don't get into 100% cpu spinning (should NEVER happen !)
-                    }
-                    catch (InterruptedException e) {
-                      break; // exit loop and thread
-                    }
-                }
-            }
-//            t=null; GemStoneAddition
-        }
-
-
-        /**
-         * Returns a list of local addresses (one for each Connector)
-         * @return List<SocketAddress>
-         */
-        public List getConnectorAddresses() {
-            Connector c;
-            ArrayList ret=new ArrayList();
-            for(Iterator it=connectors.iterator(); it.hasNext();) {
-                c=(Connector)it.next();
-                ret.add(c.getLocalAddress());
-            }
-            return ret;
-        }
-
-        /** Sends a packet. If the destination is a multicast address, call send() on all connectors.
-         * If destination is not null, send the message using <em>any</em> Connector: if we send a unicast
-         * message, it doesn't matter to which interface we are bound; the kernel will choose the correct
-         * interface based on the destination and the routing table. Note that the receiver will have the
-         * interface which was chosen by the kernel to send the message as the receiver's address, so the
-         * correct Connector will receive a possible response.
-         * @param msg
-         * @throws Exception
-         */
-        public void send(DatagramPacket msg) throws Exception {
-            InetAddress dest;
-
-            if(msg == null)
-                return;
-            dest=msg.getAddress();
-            if(dest == null)
-                throw new IOException("UDP_NIO.ConnectorTable.send(): destination address is null");
-
-            if(dest.isMulticastAddress()) {
-                // send to all Connectors
-                for(int i=0; i < connectors.size(); i++) {
-                    ((Connector)connectors.get(i)).send(msg);
-                }
-            }
-            else {
-                // send to a random connector
-                Connector c=pickRandomConnector(connectors);
-                c.send(msg);
-            }
-        }
-
-        private Connector pickRandomConnector(Vector conns) {
-            int size=conns.size();
-            int index=((int)(Util.random(size))) -1;
-            return (Connector)conns.get(index);
-        }
-
-        /**
-         * Adds the given interface address to the list of interfaces on which the receiver mcast
-         * socket has to listen.
-         * Also creates a new Connector. Calling this method twice on the same interface will throw an exception
-         * @param bind_interface
-         * @param local_port
-         * @param port_range
-         * @param receive_buffer_size
-         * @throws IOException
-         */
-        public void listenOn(String bind_interface, int local_port, int port_range, 
-                             int receive_buffer_size, int receiver_sock_buf_size, int send_sock_buf_size,
-                             int ip_ttl, Receiver receiver) throws IOException {
-            if(bind_interface == null)
-                return;
-
-            NetworkInterface ni=NetworkInterface.getByInetAddress(InetAddress.getByName(bind_interface));
-            if(ni == null)
-                throw new IOException("UDP_NIO.ConnectorTable.listenOn(): bind interface for " +
-                        bind_interface + " not found");
-
-            Connector tmp=findConnector(ni);
-            if(tmp != null) {
-                if(mywarn) mylog.warn("connector for interface " + bind_interface +
-                        " is already present (will be skipped): " + tmp);
-                return;
-            }
-
-            // 1. join the group on this interface
-            if(mcast_sock != null) {
-                mcast_sock.joinGroup(mcastAddr, ni);
-
-                    if(mylog.isInfoEnabled()) mylog.info(ExternalStrings.UDP_NIO_JOINING__0__ON_INTERFACE__1, new Object[] {mcastAddr, ni});
-            }
-
-            // 2. create a new Connector
-            tmp=new Connector(ni, local_port, port_range, receive_buffer_size, receiver_sock_buf_size,
-                    send_sock_buf_size, ip_ttl, receiver);
-            connectors.add(tmp);
-        }
-
-        private Connector findConnector(NetworkInterface ni) {
-            for(int i=0; i < connectors.size(); i++) {
-                Connector c=(Connector)connectors.elementAt(i);
-                if(c.getBindInterface().equals(ni))
-                    return c;
-            }
-            return null;
-        }
-
-
-        public void receive(DatagramPacket packet) {
-            if(receiver != null) {
-                receiver.receive(packet);
-            }
-        }
-
-
-        @Override // GemStoneAddition
-        public String toString() {
-            StringBuffer sb=new StringBuffer();
-            sb.append("*** todo: implement ***");
-            return sb.toString();
-        }
-
-
-        public static void receivePacket(DatagramPacket packet, DatagramSocket sock, Receiver receiver) throws IOException {
-            int len;
-
-            sock.receive(packet);
-            len=packet.getLength();
-            if(len == 1 && packet.getData()[0] == 0) {
-                if(mylog.isTraceEnabled()) mylog.trace("received dummy packet");
-                return;
-            }
-            if(receiver != null)
-                receiver.receive(packet);
-        }
-    }
-
-
-
-
-
-
-    public static class MyReceiver implements Receiver {
-        ConnectorTable t=null;
-
-        public MyReceiver() {
-
-        }
-
-        public void setConnectorTable(ConnectorTable t) {
-            this.t=t;
-        }
-
-        public void receive(DatagramPacket packet) {
-            System.out.println("-- received " + packet.getLength() + " bytes from " + packet.getSocketAddress());
-            InetAddress sender=packet.getAddress();
-            byte[] buf=packet.getData();
-            int len=packet.getLength();
-            String tmp=new String(buf, 0, len);
-            if(len > 4) {
-                if(tmp.startsWith("rsp:")) {
-                    System.out.println("-- received respose: \"" + tmp + '\"');
-                    return;
-                }
-            }
-
-            byte[] rsp_buf=("rsp: this is a response to " + tmp).getBytes();
-            DatagramPacket response=new DatagramPacket(rsp_buf, rsp_buf.length, sender, packet.getPort());
-
-            try {
-                t.send(response);
-            }
-            catch(Exception e) {
-                e.printStackTrace();
-                System.err.println("MyReceiver: problem sending response to " + sender);
-            }
-        }
-    }
-
-
-
-    public static class MulticastReceiver implements Runnable {
-//        Unmarshaller m=null; GemStoneAddition
-//        DatagramSocket sock=null; // may be DatagramSocket or MulticastSocket GemStoneAddition(omitted)
-
-        public void run() {
-            // receives packet from socket
-            // calls Unmarshaller.receive()
-        }
-
-    }
-
-    public static class Unmarshaller  {
-//        Queue q=null; GemStoneAddition
-
-        void receive(byte[] data, SocketAddress sender) {
-            // if (q) --> q.add()
-            // unserialize and call handleMessage()
-        }
-    }
-
-
-
-    static void help() {
-        System.out.println("UDP_NIO [-help] [-bind_addrs <list of interfaces>]");
-    }
-
-
-
-    public static void main(String[] args) {
-        MyReceiver        r=new MyReceiver();
-        ConnectorTable    ct;
-        String            line;
-        InetSocketAddress mcast_addr;
-        BufferedReader    in=null;
-        DatagramPacket    packet;
-        byte[]            send_buf;
-        int               receive_buffer_size=65000;
-        boolean           ip_mcast=true;
-
-        try {
-            mcast_addr=new InetSocketAddress("230.1.2.3", 7500);
-            ct=new ConnectorTable(mcast_addr, receive_buffer_size, 120000, ip_mcast, r);
-            r.setConnectorTable(ct);
-        }
-        catch(Exception t) {
-            t.printStackTrace();
-            return;
-        }
-
-        for(int i=0; i < args.length; i++) {
-            if("-help".equals(args[i])) {
-                help();
-                continue;
-            }
-            if("-bind_addrs".equals(args[i])) {
-                while(++i < args.length && !args[i].trim().startsWith("-")) {
-                    try {
-                        ct.listenOn(args[i], 0, 1, receive_buffer_size, 120000, 12000, 32, r);
-                    }
-                    catch(IOException e) {
-                        e.printStackTrace();
-                        return;
-                    }
-                }
-            }
-        }
-
-
-        try {
-            ct.start(); // starts all Connectors in turn
-            in=new BufferedReader(new InputStreamReader(System.in));
-            while(true) {
-                System.out.print("> "); System.out.flush();
-                line=in.readLine();
-                if(line.startsWith("quit") || line.startsWith("exit"))
-                    break;
-                send_buf=line.getBytes();
-                packet=new DatagramPacket(send_buf, send_buf.length, mcast_addr);
-                ct.send(packet);
-            }
-        }
-        catch(Exception e) {
-            e.printStackTrace();
-        }
-        finally {
-//            if(ct != null) GemStoneAddition (cannot be null)
-                ct.stop();
-        }
-    }
-
-
-
-
-}
-
-
-interface Receiver {
-
-    /** Called when data has been received on a socket. When the callback returns, the buffer will be
-     * reused: therefore, if <code>buf</code> must be processed on a separate thread, it needs to be copied.
-     * This method might be called concurrently by multiple threads, so it has to be reentrant
-     * @param packet
-     */
-    void receive(DatagramPacket packet);
-}