You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/23 09:12:44 UTC

svn commit: r388098 - in /tomcat/container/tc5.5.x/modules/groupcom: ./ src/share/org/apache/catalina/tribes/ src/share/org/apache/catalina/tribes/tcp/ src/share/org/apache/catalina/tribes/tcp/nio/ src/share/org/apache/catalina/tribes/tipis/ test/org/a...

Author: fhanik
Date: Thu Mar 23 00:12:41 2006
New Revision: 388098

URL: http://svn.apache.org/viewcvs?rev=388098&view=rev
Log:
More adjustments

Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelException.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
    tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
    tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
    tomcat/container/tc5.5.x/modules/groupcom/to-do.txt

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelException.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelException.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelException.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelException.java Thu Mar 23 00:12:41 2006
@@ -58,6 +58,12 @@
         return buf.toString();
     }
     
+    public void addFaultyMember(Member[] mbrs) {
+        for (int i=0; mbrs!=null && i<mbrs.length; i++ ) {
+            addFaultyMember(mbrs[i]);
+        }
+    }
+
     public void addFaultyMember(Member mbr) {
         if ( this.faultyMembers==null ) this.faultyMembers = new ArrayList();
         faultyMembers.add(mbr);

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java Thu Mar 23 00:12:41 2006
@@ -21,9 +21,6 @@
 
 
 /**
- * Send cluster messages with a pool of sockets (25).
- * 
- * FIXME support processing stats
  * 
  * @author Filip Hanik
  * @version 1.0
@@ -84,7 +81,7 @@
      * @return boolean
      */
     public boolean isSuspect() {
-        return state == SUSPECT;
+        return (state == SUSPECT) || (state == FAILING);
     }
 
     public void setSuspect() {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java Thu Mar 23 00:12:41 2006
@@ -17,7 +17,6 @@
 package org.apache.catalina.tribes.tcp.nio;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
@@ -26,15 +25,14 @@
 import java.nio.channels.SocketChannel;
 import java.util.Iterator;
 
-import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.ChannelReceiver;
-import org.apache.catalina.tribes.MessageListener;
 import org.apache.catalina.tribes.io.ListenCallback;
 import org.apache.catalina.tribes.io.ObjectReader;
 import org.apache.catalina.tribes.tcp.Constants;
-import org.apache.catalina.util.StringManager;
 import org.apache.catalina.tribes.tcp.ReceiverBase;
-import org.apache.catalina.tribes.tcp.*;
+import org.apache.catalina.tribes.tcp.ThreadPool;
+import org.apache.catalina.tribes.tcp.WorkerThread;
+import org.apache.catalina.util.StringManager;
 
 /**
  * @author Filip Hanik

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java Thu Mar 23 00:12:41 2006
@@ -139,8 +139,6 @@
                 reader.append(buffer.array(),0,count,false);
             else 
                 reader.append(buffer,count,false);
-            
-            
             buffer.clear();		// make buffer empty
         }
         
@@ -176,6 +174,7 @@
         
         if (count < 0) {
             // close channel on EOF, invalidates the key
+            if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting");
             channel.close();
             return;
         }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java Thu Mar 23 00:12:41 2006
@@ -81,6 +81,10 @@
     public boolean process(SelectionKey key, boolean waitForAck) throws IOException {
         int ops = key.readyOps();
         key.interestOps(key.interestOps() & ~ops);
+        
+        //in case disconnect has been called
+        if ((!isConnected()) && (!connecting)) throw new IOException("Sender has been disconnected, can't selection key.");
+        if ( !key.isValid() ) throw new IOException("Key is not valid, it must have been cancelled.");
         if ( key.isConnectable() ) {
             if ( socketChannel.finishConnect() ) {
                 //we connected, register ourselves for writing
@@ -91,6 +95,7 @@
                 socketChannel.socket().setSendBufferSize(getTxBufSize());
                 socketChannel.socket().setReceiveBufferSize(getRxBufSize());
                 socketChannel.socket().setSoTimeout((int)getTimeout());
+                socketChannel.socket().setSoLinger(false,0);
                 if ( current != null ) key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
                 return false;
             } else  { 
@@ -109,7 +114,7 @@
                     //if not, we are ready, setMessage will reregister us for another write interest
                     //do a health check, we have no way of verify a disconnected
                     //socket since we don't register for OP_READ on waitForAck=false
-                    read(key);//this causes overhead.
+                    read(key);//this causes overhead
                     setRequestCount(getRequestCount()+1);
                     return true;
                 }
@@ -216,14 +221,12 @@
             setConnected(false);
             if ( socketChannel != null ) {
                 try {
-                    Socket socket = null;
-                    //socket = socketChannel.socket();
+                    try {socketChannel.socket().close();}catch ( Exception x){}
                     //error free close, all the way
                     //try {socket.shutdownOutput();}catch ( Exception x){}
                     //try {socket.shutdownInput();}catch ( Exception x){}
                     //try {socket.close();}catch ( Exception x){}
                     try {socketChannel.close();}catch ( Exception x){}
-                    socket = null;
                 }finally {
                     socketChannel = null;
                 }
@@ -232,7 +235,6 @@
             log.error("Unable to disconnect NioSender. msg="+x.getMessage());
             if ( log.isDebugEnabled() ) log.debug("Unable to disconnect NioSender. msg="+x.getMessage(),x);
         } finally {
-            reset();
         }
 
     }
@@ -264,11 +266,11 @@
     * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method
     */
    public synchronized void setMessage(byte[] data) throws IOException {
-       reset();
        if ( data != null ) {
            current = data;
            remaining = current.length;
            curPos = 0;
+           ackbuf.clear();
            if (isConnected()) {
                socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this);
            }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java Thu Mar 23 00:12:41 2006
@@ -33,6 +33,7 @@
 import org.apache.catalina.tribes.tcp.AbstractSender;
 import java.net.UnknownHostException;
 import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.tipis.RpcChannel;
 
 /**
  * <p>Title: </p>
@@ -56,6 +57,7 @@
 
     public ParallelNioSender() throws IOException {
         selector = Selector.open();
+        setConnected(true);
     }
     
     
@@ -67,16 +69,29 @@
         setData(senders,data);
         
         int remaining = senders.length;
+        ChannelException cx = null;
         try {
             //loop until complete, an error happens, or we timeout
             long delta = System.currentTimeMillis() - start;
+            boolean waitForAck = (Channel.SEND_OPTIONS_USE_ACK & msg.getOptions()) == Channel.SEND_OPTIONS_USE_ACK;
             while ( (remaining>0) && (delta<getTimeout()) ) {
-                remaining -= doLoop(selectTimeout,getMaxRetryAttempts(),(Channel.SEND_OPTIONS_USE_ACK&msg.getOptions())==Channel.SEND_OPTIONS_USE_ACK);
+                try {
+                    remaining -= doLoop(selectTimeout, getMaxRetryAttempts(),waitForAck);
+                } catch (Exception x ) {
+                    if ( cx == null ) {
+                        if ( x instanceof ChannelException ) cx = (ChannelException)x;
+                        else cx = new ChannelException("Parallel NIO send failed.", x);
+                    } else {
+                        if (x instanceof ChannelException) cx.addFaultyMember( ( (ChannelException) x).getFaultyMembers());
+                    }
+                }
+                //bail out if all remaining senders are failing
+                if ( cx != null && cx.getFaultyMembers().length == remaining ) throw cx;
                 delta = System.currentTimeMillis() - start;
             }
             if ( remaining > 0 ) {
                 //timeout has occured
-                ChannelException cx = new ChannelException("Operation has timed out("+getTimeout()+" ms.).");
+                cx = new ChannelException("Operation has timed out("+getTimeout()+" ms.).");
                 for (int i=0; i<senders.length; i++ ) {
                     if (!senders[i].isComplete() ) cx.addFaultyMember(senders[i].getDestination());
                 }
@@ -107,20 +122,35 @@
             NioSender sender = (NioSender) sk.attachment();
             try {
                 if (sender.process(sk,waitForAck)) {
-                    sender.reset();
                     completed++;
                     sender.setComplete(true);
                     SenderState.getSenderState(sender.getDestination()).setReady();
                 }//end if
             } catch (Exception x) {
                 SenderState state = SenderState.getSenderState(sender.getDestination());
-                if ( state.isReady() ) {
-                    log.warn("Member send is failing for:"+sender.getDestination().getName()+"; Setting to suspect.");
+                int attempt = sender.getAttempt()+1;
+                boolean retry = (sender.getAttempt() <= maxAttempts && maxAttempts>0);
+                synchronized (state) {
+                
+                    //sk.cancel();
+                    if (state.isSuspect()) state.setFailing();
+                    if (state.isReady()) {
+                        state.setSuspect();
+                        if ( retry ) 
+                            log.warn("Member send is failing for:" + sender.getDestination().getName() +" ; Setting to suspect and retrying.");
+                        else 
+                            log.warn("Member send is failing for:" + sender.getDestination().getName() +" ; Setting to suspect.", x);
+                    }                    
                 }
-                state.setSuspect();
+                if ( !isConnected() ) {
+                    log.warn("Not retrying send for:" + sender.getDestination().getName() + "; Sender is disconnected.");
+                    ChannelException cx = new ChannelException("Send failed, and sender is disconnected. Not retrying.",x);
+                    cx.addFaultyMember(sender.getDestination());
+                    throw cx;
+                }
+                
                 byte[] data = sender.getMessage();
-                int attempt = sender.getAttempt()+1;
-                if ( sender.getAttempt() <= maxAttempts && maxAttempts>0 ) {
+                if ( retry ) {
                     try { 
                         sender.disconnect(); 
                         sender.connect();
@@ -232,8 +262,9 @@
 
     
     public synchronized void disconnect() {
-        try {close(); }catch (Exception x){}
         setConnected(false);
+        try {close(); }catch (Exception x){}
+        
     }
     
     public void finalize() {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Thu Mar 23 00:12:41 2006
@@ -135,26 +135,26 @@
 
         //transfer state from another map
         transferState();
-        broadcast(MapMessage.MSG_START,true);
+        try {
+            broadcast(MapMessage.MSG_START, true);
+        } catch (ChannelException x) {
+            log.warn("Unable to send map start message.");
+        }
 
     }
 
-    private void broadcast(int msgtype, boolean rpc) {
-        try {
-            //send out a map membership message, only wait for the first reply
-            MapMessage msg = new MapMessage(this.mapContextName, msgtype,
-                                            false, null, null, null, wrap(channel.getLocalMember(false)));
-            if ( rpc) {
-                Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);
-                for (int i = 0; i < resp.length; i++) {
-                    mapMemberAdded(resp[i].getSource());
-                    messageReceived(resp[i].getMessage(), resp[i].getSource());
-                }
-            } else {
-                channel.send(channel.getMembers(),msg,channelSendOptions);
+    private void broadcast(int msgtype, boolean rpc) throws ChannelException {
+        //send out a map membership message, only wait for the first reply
+        MapMessage msg = new MapMessage(this.mapContextName, msgtype,
+                                        false, null, null, null, wrap(channel.getLocalMember(false)));
+        if ( rpc) {
+            Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);
+            for (int i = 0; i < resp.length; i++) {
+                mapMemberAdded(resp[i].getSource());
+                messageReceived(resp[i].getMessage(), resp[i].getSource());
             }
-        } catch (ChannelException x) {
-            log.warn("Unable to send map start message.");
+        } else {
+            channel.send(channel.getMembers(),msg,channelSendOptions);
         }
     }
 
@@ -163,7 +163,7 @@
     }
 
     public void finalize() {
-        broadcast(MapMessage.MSG_STOP,false);
+        try {broadcast(MapMessage.MSG_STOP,false); }catch ( Exception ignore){}
         //cleanup
         if (this.rpcChannel != null) {
             this.rpcChannel.breakdown();
@@ -293,14 +293,14 @@
      * @param msg Serializable
      * @return Serializable - null if no reply should be sent
      */
-    public Serializable replyRequest(Serializable msg, Member sender) {
+    public Serializable replyRequest(Serializable msg, final Member sender) {
         if (! (msg instanceof MapMessage))return null;
         MapMessage mapmsg = (MapMessage) msg;
 
         //map start request
         if (mapmsg.getMsgType() == mapmsg.MSG_START) {
-            mapMemberAdded(sender);
             mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
+            mapMemberAdded(sender);
             return mapmsg;
         }
 
@@ -417,12 +417,12 @@
                         }
                     } else {
                         if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
-                        diff.setOwner(getMapOwner());
+                        ((ReplicatedMapEntry)entry.getValue()).setOwner(getMapOwner());
                     } //end if
                 } else if  (mapmsg.getValue() instanceof ReplicatedMapEntry) {
                     ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue();
                     re.setOwner(getMapOwner());
-                    if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
+                    entry.setValue(re);
                 } else {
                     if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
                 } //end if
@@ -439,28 +439,35 @@
     }
 
     public void mapMemberAdded(Member member) {
-        if ( member.equals(getChannel().getLocalMember(false)) ) return;        
+        if ( member.equals(getChannel().getLocalMember(false)) ) return;
+        boolean memberAdded = false;
         //select a backup node if we don't have one
         synchronized (mapMembers) {
-            if (!mapMembers.contains(member) ) mapMembers.add(member);
+            if (!mapMembers.contains(member) ) {
+                mapMembers.add(member);
+                memberAdded = true;
+            }
         }
-        synchronized (stateMutex) {
-            Iterator i = super.entrySet().iterator();
-            while (i.hasNext()) {
-                Map.Entry e = (Map.Entry) i.next();
-                MapEntry entry = (MapEntry) e.getValue();
-                if ( entry == null ) continue;
-                if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) {
-                    try {
-                        Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
-                        entry.setBackupNodes(backup);
-                    } catch (ChannelException x) {
-                        log.error("Unable to select backup node.", x);
-                    } //catch
-                } //end if
-            } //while
-        } //synchronized
-
+        if ( memberAdded ) {
+            printMap("BEFORE MEMBER ADD");
+            synchronized (stateMutex) {
+                Iterator i = super.entrySet().iterator();
+                while (i.hasNext()) {
+                    Map.Entry e = (Map.Entry) i.next();
+                    MapEntry entry = (MapEntry) e.getValue();
+                    if ( entry == null ) continue;
+                    if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) {
+                        try {
+                            Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
+                            entry.setBackupNodes(backup);
+                        } catch (ChannelException x) {
+                            log.error("Unable to select backup node.", x);
+                        } //catch
+                    } //end if
+                } //while
+            } //synchronized
+            printMap("AFTER MEMBER ADD");
+        }//end if
     }
     
     public boolean inSet(Member m, Member[] set) {
@@ -471,16 +478,27 @@
         return result;
     }
 
+    public Member[] excludeFromSet(Member[] mbrs, Member[] set) {
+        ArrayList result = new ArrayList();
+        for (int i=0; i<set.length; i++ ) {
+            boolean include = true;
+            for (int j=0; j<mbrs.length; j++ ) 
+                if ( mbrs[j].equals(set[i]) ) include = false;
+            if ( include ) result.add(set[i]);
+        }
+        return (Member[])result.toArray(new Member[result.size()]);
+    }
+
     public void memberAdded(Member member) {
         //do nothing
     }
 
     public void memberDisappeared(Member member) {
+        printMap("BEFORE MEMBER GONE");
+        boolean removed = false;
         synchronized (mapMembers) {
-            mapMembers.remove(member);
+            removed = mapMembers.remove(member);
         }
-        //todo move all sessions that are primary here to and have this member as
-        //a backup
         Iterator i = super.entrySet().iterator();
         while (i.hasNext()) {
             Map.Entry e = (Map.Entry) i.next();
@@ -494,6 +512,7 @@
                 }
             } //end if
         } //while
+        printMap("AFTER MEMBER GONE");
     }
 
     int currentNode = 0;
@@ -547,8 +566,9 @@
         private Serializable value;
 
         public MapEntry(Serializable key, Serializable value) {
-            this.key = key;
-            this.value = value;
+            setKey(key);
+            setValue(value);
+            
         }
 
         public boolean isBackup() {
@@ -591,11 +611,21 @@
         public Object setValue(Object value) {
             Object old = this.value;
             this.value = (Serializable) value;
+            if ( value==null ) {
+                Exception x = new Exception(this.toString());
+                x.printStackTrace();
+            }
             return old;
         }
 
         public Object getKey() {
             return key;
+        }
+        
+        public Object setKey(Object key) {
+            Object old = this.key;
+            this.key = (Serializable)key;
+            return old;
         }
 
         public int hashCode() {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Thu Mar 23 00:12:41 2006
@@ -125,27 +125,33 @@
      */
     protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException {
         //select a backup node
-        Member backup = getNextBackupNode();
+        Member next = getNextBackupNode();
         
-        if ( backup == null ) return null;
+        if ( next == null ) return null;
         
+        Member[] backup = wrap(next);
+        MapMessage msg = null;
         //publish the data out to all nodes
-        MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false,
-                                        (Serializable) key, null, null, wrap(backup));
-        getChannel().send(getMapMembers(), msg,getChannelSendOptions());
-
+        Member[] proxies = excludeFromSet(backup,getMapMembers());
+        if ( proxies.length > 0 ) {
+            msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false,
+                                            (Serializable) key, null, null, backup);
+            getChannel().send(proxies, msg, getChannelSendOptions());
+        }
         //publish the backup data to one node
         msg = new MapMessage(getMapContextName(), MapMessage.MSG_BACKUP, false,
-                             (Serializable) key, (Serializable) value, null, wrap(backup));
-        getChannel().send(new Member[] {backup}, msg, getChannelSendOptions());
-        return wrap(backup);
+                             (Serializable) key, (Serializable) value, null, backup);
+        getChannel().send(backup, msg, getChannelSendOptions());
+        return backup;
     }
     
     public Object get(Object key) {
         MapEntry entry = (MapEntry)super.get(key);
+        if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" entry:"+entry);
         System.out.println("Requesting id:"+key+" entry:"+entry);
         if ( entry == null ) return null;
         if ( !entry.isPrimary() ) {
+            if ( entry.isProxy() ) System.out.println("PROXY:Requesting id:"+key+" entry:"+entry);
             //if the message is not primary, we need to retrieve the latest value
             try {
                 Member[] backup = null;
@@ -188,7 +194,7 @@
                 return null;
             }
         }
-        System.out.println("Requesting id:"+key+" result:"+entry.getValue());
+        if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" result:"+entry.getValue());
         return entry.getValue();
     }
 

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java Thu Mar 23 00:12:41 2006
@@ -101,7 +101,6 @@
         return collector.getResponses();
     }
     
-    
     public void messageReceived(Serializable msg, Member sender) {
         RpcMessage rmsg = (RpcMessage)msg;
         RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid);

Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java Thu Mar 23 00:12:41 2006
@@ -145,7 +145,7 @@
                             Thread.sleep(pause);
                         }
                     } catch (ChannelException x) {
-                        log.error("Unable to send message:"+x.getMessage());
+                        log.error("Unable to send message:"+x.getMessage(),x);
                         Member[] faulty = x.getFaultyMembers();
                         for (int i=0; i<faulty.length; i++ ) log.error("Faulty: "+faulty[i]);
                         --counter;

Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java Thu Mar 23 00:12:41 2006
@@ -49,9 +49,9 @@
         table = SimpleTableDemo.createAndShowGUI(map,channel.getLocalMember(false).getName());
         channel.addChannelListener(this);
         channel.addMembershipListener(this);
-        for ( int i=0; i<1000; i++ ) {
-            map.put("MyKey-"+i,"My String Value-"+i);
-        }
+//        for ( int i=0; i<1000; i++ ) {
+//            map.put("MyKey-"+i,"My String Value-"+i);
+//        }
         this.messageReceived(null,null);
     }
     

Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Thu Mar 23 00:12:41 2006
@@ -27,6 +27,11 @@
  
  8. Future version will have WAN membership and replication
 
+Bugs:
+===========================================
+ a) Somehow the first NIO connection made, always closes down, why
+ b) State synchronization for the map
+
 Code Tasks:
 ===========================================
 32. Replicated JNDI entries in Tomcat in the format



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org