You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/14 06:44:48 UTC

svn commit: r385742 - in /tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes: group/ group/interceptors/ io/ mcast/ tcp/nio/

Author: fhanik
Date: Mon Mar 13 21:44:46 2006
New Revision: 385742

URL: http://svn.apache.org/viewcvs?rev=385742&view=rev
Log:
Optimizing the interceptors

Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastServiceImpl.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=385742&r1=385741&r2=385742&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java Mon Mar 13 21:44:46 2006
@@ -37,6 +37,7 @@
     private ChannelReceiver clusterReceiver;
     private ChannelSender clusterSender;
     private MembershipService membershipService;
+    private boolean started = false;
 
     public ChannelCoordinator() {
         
@@ -75,15 +76,18 @@
      * SND_RX_SEQ - starts the replication receiver<BR>
      * @throws ChannelException if a startup error occurs or the service is already started.
      */
-    public void start(int svc) throws ChannelException {
+    public synchronized void start() throws ChannelException {
         try {
+            if (started) return;
+            //must start the receiver first so that we can coordinate the port it
+            //listens to with the local membership settings
+            clusterReceiver.start();
+            clusterSender.start();
             //synchronize, big time FIXME
             membershipService.setLocalMemberProperties(getClusterReceiver().getHost(), getClusterReceiver().getPort());
-            //end FIXME
-            if ( (svc & Channel.SND_RX_SEQ) == Channel.SND_RX_SEQ) clusterReceiver.start();
-            if ( (svc & Channel.SND_TX_SEQ) == Channel.SND_TX_SEQ) clusterSender.start();
-            if ( (svc & Channel.MBR_RX_SEQ) == Channel.MBR_RX_SEQ) membershipService.start(MembershipService.MBR_RX);
-            if ( (svc & Channel.MBR_TX_SEQ) == Channel.MBR_TX_SEQ) membershipService.start(MembershipService.MBR_TX);
+            membershipService.start(MembershipService.MBR_RX);
+            membershipService.start(MembershipService.MBR_TX);
+            this.started = true;
         }catch ( ChannelException cx ) {
             throw cx;
         }catch ( Exception x ) {
@@ -102,14 +106,16 @@
      * SND_RX_SEQ - starts the replication receiver<BR>
      * @throws ChannelException if a startup error occurs or the service is already started.
      */
-    public void stop(int svc) throws ChannelException {
+    public void stop() throws ChannelException {
         try {
-            if ( (svc & Channel.MBR_RX_SEQ) == Channel.MBR_RX_SEQ) membershipService.stop();
-            if ( (svc & Channel.SND_RX_SEQ) == Channel.SND_RX_SEQ) clusterReceiver.stop();
-            if ( (svc & Channel.SND_TX_SEQ) == Channel.SND_TX_SEQ) clusterSender.stop();
-            if ( (svc & Channel.MBR_TX_SEQ) == Channel.MBR_RX_SEQ) membershipService.stop();
+            membershipService.stop();
+            clusterReceiver.stop();
+            clusterSender.stop();
+            membershipService.stop();
         }catch ( Exception x ) {
             throw new ChannelException(x);
+        } finally {
+            started = false;
         }
 
     }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=385742&r1=385741&r2=385742&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Mon Mar 13 21:44:46 2006
@@ -173,7 +173,7 @@
      * @throws ChannelException if a startup error occurs or the service is already started.
      */
     public void start(int svc) throws ChannelException {
-        coordinator.start(svc);
+        coordinator.start();
     }
 
     /**
@@ -188,7 +188,7 @@
      * @throws ChannelException if a startup error occurs or the service is already started.
      */
     public void stop(int svc) throws ChannelException {
-        coordinator.stop(svc);
+        coordinator.stop();
     }
 
     public ChannelReceiver getChannelReceiver() {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?rev=385742&r1=385741&r2=385742&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java Mon Mar 13 21:44:46 2006
@@ -60,12 +60,14 @@
 
     public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
         for ( int i=0; i<destination.length; i++ ) {
-            ChannelMessage tmp = msg.clone();
             int nr = incCounter(destination[i]);
             //reduce byte copy
-            //tmp.getMessage().append(XByteBuffer.toBytes(nr),0,4);
-            tmp.getMessage().append(nr);
-            getNext().sendMessage(new Member[] {destination[i]}, tmp, payload);
+            msg.getMessage().append(nr);
+            try {
+                getNext().sendMessage(new Member[] {destination[i]}, msg, payload);
+            }finally {
+                msg.getMessage().trim(4);
+            }
         }
     }
 

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java?rev=385742&r1=385741&r2=385742&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java Mon Mar 13 21:44:46 2006
@@ -212,9 +212,20 @@
         } else return false;
     }
     
+    /**
+     * Create a shallow clone, only the data gets recreated
+     * @return ClusterData
+     */
     public ClusterData clone() {
-        byte[] d = this.getDataPackage();
-        return ClusterData.getDataFromPackage(d);
+//        byte[] d = this.getDataPackage();
+//        return ClusterData.getDataFromPackage(d);
+        ClusterData clone = new ClusterData(false);
+        clone.options = this.options;
+        clone.message = new XByteBuffer(this.message.getBytesDirect(),false);
+        clone.timestamp = this.timestamp;
+        clone.uniqueId = this.uniqueId;
+        clone.address = this.address;
+        return clone;
     }
     
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java?rev=385742&r1=385741&r2=385742&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java Mon Mar 13 21:44:46 2006
@@ -207,7 +207,7 @@
        int dlen = XByteBuffer.toInt(dlend, 0);
        byte[] domaind = new byte[dlen];
        System.arraycopy(data, 20, domaind, 0, domaind.length);
-       member.setDomain(new String(domaind));
+       member.domain = domaind;
        member.setHost(addr);
        member.setPort(XByteBuffer.toInt(portd, 0));
        member.setMemberAliveTime(XByteBuffer.toLong(alived, 0));

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastServiceImpl.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastServiceImpl.java?rev=385742&r1=385741&r2=385742&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastServiceImpl.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastServiceImpl.java Mon Mar 13 21:44:46 2006
@@ -210,6 +210,7 @@
         McastMember m = McastMember.getMember(data);
         if(log.isDebugEnabled())
             log.debug("Mcast receive ping from member " + m);
+
         if ( membership.memberAlive(m) ) {
             if(log.isDebugEnabled())
                 log.debug("Mcast add member " + m);

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java?rev=385742&r1=385741&r2=385742&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java Mon Mar 13 21:44:46 2006
@@ -64,6 +64,7 @@
     private int tcpThreadCount;
     private long tcpSelectorTimeout;
     private Selector selector = null;
+    private ServerSocketChannel serverChannel = null;
 
     private java.net.InetAddress bind;
     private String tcpListenAddress;
@@ -135,6 +136,7 @@
         }
         try {
             getBind();
+            this.bind();
             Thread t = new Thread(this, "NioReceiver");
             t.setDaemon(true);
             t.start();
@@ -142,31 +144,63 @@
             log.fatal("Unable to start cluster receiver", x);
         }
     }
-
+    
     /**
-     * get data from channel and store in byte array
-     * send it to cluster
+     * recursive bind to find the next available port
+     * @param socket ServerSocket
+     * @param portstart int
+     * @param retries int
+     * @return int
      * @throws IOException
-     * @throws java.nio.channels.ClosedChannelException
      */
-    protected void listen() throws Exception {
-        if (doListen) {
-            log.warn("ServerSocketChannel allready started");
-            return;
+    protected int bind(ServerSocket socket, int portstart, int retries) throws IOException {
+        while ( retries > 0 ) {
+            try {
+                InetSocketAddress addr = new InetSocketAddress(getBind(), portstart);
+                socket.bind(addr);
+                setTcpListenPort(portstart);
+                log.info("Nio Server Socket bound to:"+addr);
+                return 0;
+            }catch ( IOException x) {
+                retries--;
+                if ( retries <= 0 ) throw x;
+                portstart++;
+                retries = bind(socket,portstart,retries);
+            }
         }
-        doListen = true;
+        return retries;
+    }
+    
+    protected void bind() throws IOException {
         // allocate an unbound server socket channel
-        ServerSocketChannel serverChannel = ServerSocketChannel.open();
+        serverChannel = ServerSocketChannel.open();
         // Get the associated ServerSocket to bind it with
         ServerSocket serverSocket = serverChannel.socket();
         // create a new Selector for use below
         selector = Selector.open();
         // set the port the server channel will listen to
-        serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort()));
+        //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort()));
+        bind(serverSocket,getTcpListenPort(),10);
         // set non-blocking mode for the listening socket
         serverChannel.configureBlocking(false);
         // register the ServerSocketChannel with the Selector
         serverChannel.register(selector, SelectionKey.OP_ACCEPT);
+        
+    }
+    /**
+     * get data from channel and store in byte array
+     * send it to cluster
+     * @throws IOException
+     * @throws java.nio.channels.ClosedChannelException
+     */
+    protected void listen() throws Exception {
+        if (doListen) {
+            log.warn("ServerSocketChannel allready started");
+            return;
+        }
+        
+        doListen = true;
+
         while (doListen && selector != null) {
             // this may block for a long time, upon return the
             // selected set contains keys of the ready channels



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org