You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/23 09:12:44 UTC
svn commit: r388098 - in /tomcat/container/tc5.5.x/modules/groupcom: ./
src/share/org/apache/catalina/tribes/
src/share/org/apache/catalina/tribes/tcp/
src/share/org/apache/catalina/tribes/tcp/nio/
src/share/org/apache/catalina/tribes/tipis/ test/org/a...
Author: fhanik
Date: Thu Mar 23 00:12:41 2006
New Revision: 388098
URL: http://svn.apache.org/viewcvs?rev=388098&view=rev
Log:
More adjustments
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelException.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelException.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelException.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelException.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelException.java Thu Mar 23 00:12:41 2006
@@ -58,6 +58,12 @@
return buf.toString();
}
+ public void addFaultyMember(Member[] mbrs) {
+ for (int i=0; mbrs!=null && i<mbrs.length; i++ ) {
+ addFaultyMember(mbrs[i]);
+ }
+ }
+
public void addFaultyMember(Member mbr) {
if ( this.faultyMembers==null ) this.faultyMembers = new ArrayList();
faultyMembers.add(mbr);
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java Thu Mar 23 00:12:41 2006
@@ -21,9 +21,6 @@
/**
- * Send cluster messages with a pool of sockets (25).
- *
- * FIXME support processing stats
*
* @author Filip Hanik
* @version 1.0
@@ -84,7 +81,7 @@
* @return boolean
*/
public boolean isSuspect() {
- return state == SUSPECT;
+ return (state == SUSPECT) || (state == FAILING);
}
public void setSuspect() {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java Thu Mar 23 00:12:41 2006
@@ -17,7 +17,6 @@
package org.apache.catalina.tribes.tcp.nio;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
@@ -26,15 +25,14 @@
import java.nio.channels.SocketChannel;
import java.util.Iterator;
-import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.ChannelReceiver;
-import org.apache.catalina.tribes.MessageListener;
import org.apache.catalina.tribes.io.ListenCallback;
import org.apache.catalina.tribes.io.ObjectReader;
import org.apache.catalina.tribes.tcp.Constants;
-import org.apache.catalina.util.StringManager;
import org.apache.catalina.tribes.tcp.ReceiverBase;
-import org.apache.catalina.tribes.tcp.*;
+import org.apache.catalina.tribes.tcp.ThreadPool;
+import org.apache.catalina.tribes.tcp.WorkerThread;
+import org.apache.catalina.util.StringManager;
/**
* @author Filip Hanik
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java Thu Mar 23 00:12:41 2006
@@ -139,8 +139,6 @@
reader.append(buffer.array(),0,count,false);
else
reader.append(buffer,count,false);
-
-
buffer.clear(); // make buffer empty
}
@@ -176,6 +174,7 @@
if (count < 0) {
// close channel on EOF, invalidates the key
+ if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting");
channel.close();
return;
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java Thu Mar 23 00:12:41 2006
@@ -81,6 +81,10 @@
public boolean process(SelectionKey key, boolean waitForAck) throws IOException {
int ops = key.readyOps();
key.interestOps(key.interestOps() & ~ops);
+
+ //in case disconnect has been called
+ if ((!isConnected()) && (!connecting)) throw new IOException("Sender has been disconnected, can't selection key.");
+ if ( !key.isValid() ) throw new IOException("Key is not valid, it must have been cancelled.");
if ( key.isConnectable() ) {
if ( socketChannel.finishConnect() ) {
//we connected, register ourselves for writing
@@ -91,6 +95,7 @@
socketChannel.socket().setSendBufferSize(getTxBufSize());
socketChannel.socket().setReceiveBufferSize(getRxBufSize());
socketChannel.socket().setSoTimeout((int)getTimeout());
+ socketChannel.socket().setSoLinger(false,0);
if ( current != null ) key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
return false;
} else {
@@ -109,7 +114,7 @@
//if not, we are ready, setMessage will reregister us for another write interest
//do a health check, we have no way of verify a disconnected
//socket since we don't register for OP_READ on waitForAck=false
- read(key);//this causes overhead.
+ read(key);//this causes overhead
setRequestCount(getRequestCount()+1);
return true;
}
@@ -216,14 +221,12 @@
setConnected(false);
if ( socketChannel != null ) {
try {
- Socket socket = null;
- //socket = socketChannel.socket();
+ try {socketChannel.socket().close();}catch ( Exception x){}
//error free close, all the way
//try {socket.shutdownOutput();}catch ( Exception x){}
//try {socket.shutdownInput();}catch ( Exception x){}
//try {socket.close();}catch ( Exception x){}
try {socketChannel.close();}catch ( Exception x){}
- socket = null;
}finally {
socketChannel = null;
}
@@ -232,7 +235,6 @@
log.error("Unable to disconnect NioSender. msg="+x.getMessage());
if ( log.isDebugEnabled() ) log.debug("Unable to disconnect NioSender. msg="+x.getMessage(),x);
} finally {
- reset();
}
}
@@ -264,11 +266,11 @@
* @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method
*/
public synchronized void setMessage(byte[] data) throws IOException {
- reset();
if ( data != null ) {
current = data;
remaining = current.length;
curPos = 0;
+ ackbuf.clear();
if (isConnected()) {
socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this);
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java Thu Mar 23 00:12:41 2006
@@ -33,6 +33,7 @@
import org.apache.catalina.tribes.tcp.AbstractSender;
import java.net.UnknownHostException;
import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.tipis.RpcChannel;
/**
* <p>Title: </p>
@@ -56,6 +57,7 @@
public ParallelNioSender() throws IOException {
selector = Selector.open();
+ setConnected(true);
}
@@ -67,16 +69,29 @@
setData(senders,data);
int remaining = senders.length;
+ ChannelException cx = null;
try {
//loop until complete, an error happens, or we timeout
long delta = System.currentTimeMillis() - start;
+ boolean waitForAck = (Channel.SEND_OPTIONS_USE_ACK & msg.getOptions()) == Channel.SEND_OPTIONS_USE_ACK;
while ( (remaining>0) && (delta<getTimeout()) ) {
- remaining -= doLoop(selectTimeout,getMaxRetryAttempts(),(Channel.SEND_OPTIONS_USE_ACK&msg.getOptions())==Channel.SEND_OPTIONS_USE_ACK);
+ try {
+ remaining -= doLoop(selectTimeout, getMaxRetryAttempts(),waitForAck);
+ } catch (Exception x ) {
+ if ( cx == null ) {
+ if ( x instanceof ChannelException ) cx = (ChannelException)x;
+ else cx = new ChannelException("Parallel NIO send failed.", x);
+ } else {
+ if (x instanceof ChannelException) cx.addFaultyMember( ( (ChannelException) x).getFaultyMembers());
+ }
+ }
+ //bail out if all remaining senders are failing
+ if ( cx != null && cx.getFaultyMembers().length == remaining ) throw cx;
delta = System.currentTimeMillis() - start;
}
if ( remaining > 0 ) {
//timeout has occured
- ChannelException cx = new ChannelException("Operation has timed out("+getTimeout()+" ms.).");
+ cx = new ChannelException("Operation has timed out("+getTimeout()+" ms.).");
for (int i=0; i<senders.length; i++ ) {
if (!senders[i].isComplete() ) cx.addFaultyMember(senders[i].getDestination());
}
@@ -107,20 +122,35 @@
NioSender sender = (NioSender) sk.attachment();
try {
if (sender.process(sk,waitForAck)) {
- sender.reset();
completed++;
sender.setComplete(true);
SenderState.getSenderState(sender.getDestination()).setReady();
}//end if
} catch (Exception x) {
SenderState state = SenderState.getSenderState(sender.getDestination());
- if ( state.isReady() ) {
- log.warn("Member send is failing for:"+sender.getDestination().getName()+"; Setting to suspect.");
+ int attempt = sender.getAttempt()+1;
+ boolean retry = (sender.getAttempt() <= maxAttempts && maxAttempts>0);
+ synchronized (state) {
+
+ //sk.cancel();
+ if (state.isSuspect()) state.setFailing();
+ if (state.isReady()) {
+ state.setSuspect();
+ if ( retry )
+ log.warn("Member send is failing for:" + sender.getDestination().getName() +" ; Setting to suspect and retrying.");
+ else
+ log.warn("Member send is failing for:" + sender.getDestination().getName() +" ; Setting to suspect.", x);
+ }
}
- state.setSuspect();
+ if ( !isConnected() ) {
+ log.warn("Not retrying send for:" + sender.getDestination().getName() + "; Sender is disconnected.");
+ ChannelException cx = new ChannelException("Send failed, and sender is disconnected. Not retrying.",x);
+ cx.addFaultyMember(sender.getDestination());
+ throw cx;
+ }
+
byte[] data = sender.getMessage();
- int attempt = sender.getAttempt()+1;
- if ( sender.getAttempt() <= maxAttempts && maxAttempts>0 ) {
+ if ( retry ) {
try {
sender.disconnect();
sender.connect();
@@ -232,8 +262,9 @@
public synchronized void disconnect() {
- try {close(); }catch (Exception x){}
setConnected(false);
+ try {close(); }catch (Exception x){}
+
}
public void finalize() {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Thu Mar 23 00:12:41 2006
@@ -135,26 +135,26 @@
//transfer state from another map
transferState();
- broadcast(MapMessage.MSG_START,true);
+ try {
+ broadcast(MapMessage.MSG_START, true);
+ } catch (ChannelException x) {
+ log.warn("Unable to send map start message.");
+ }
}
- private void broadcast(int msgtype, boolean rpc) {
- try {
- //send out a map membership message, only wait for the first reply
- MapMessage msg = new MapMessage(this.mapContextName, msgtype,
- false, null, null, null, wrap(channel.getLocalMember(false)));
- if ( rpc) {
- Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);
- for (int i = 0; i < resp.length; i++) {
- mapMemberAdded(resp[i].getSource());
- messageReceived(resp[i].getMessage(), resp[i].getSource());
- }
- } else {
- channel.send(channel.getMembers(),msg,channelSendOptions);
+ private void broadcast(int msgtype, boolean rpc) throws ChannelException {
+ //send out a map membership message, only wait for the first reply
+ MapMessage msg = new MapMessage(this.mapContextName, msgtype,
+ false, null, null, null, wrap(channel.getLocalMember(false)));
+ if ( rpc) {
+ Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);
+ for (int i = 0; i < resp.length; i++) {
+ mapMemberAdded(resp[i].getSource());
+ messageReceived(resp[i].getMessage(), resp[i].getSource());
}
- } catch (ChannelException x) {
- log.warn("Unable to send map start message.");
+ } else {
+ channel.send(channel.getMembers(),msg,channelSendOptions);
}
}
@@ -163,7 +163,7 @@
}
public void finalize() {
- broadcast(MapMessage.MSG_STOP,false);
+ try {broadcast(MapMessage.MSG_STOP,false); }catch ( Exception ignore){}
//cleanup
if (this.rpcChannel != null) {
this.rpcChannel.breakdown();
@@ -293,14 +293,14 @@
* @param msg Serializable
* @return Serializable - null if no reply should be sent
*/
- public Serializable replyRequest(Serializable msg, Member sender) {
+ public Serializable replyRequest(Serializable msg, final Member sender) {
if (! (msg instanceof MapMessage))return null;
MapMessage mapmsg = (MapMessage) msg;
//map start request
if (mapmsg.getMsgType() == mapmsg.MSG_START) {
- mapMemberAdded(sender);
mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
+ mapMemberAdded(sender);
return mapmsg;
}
@@ -417,12 +417,12 @@
}
} else {
if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
- diff.setOwner(getMapOwner());
+ ((ReplicatedMapEntry)entry.getValue()).setOwner(getMapOwner());
} //end if
} else if (mapmsg.getValue() instanceof ReplicatedMapEntry) {
ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue();
re.setOwner(getMapOwner());
- if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
+ entry.setValue(re);
} else {
if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
} //end if
@@ -439,28 +439,35 @@
}
public void mapMemberAdded(Member member) {
- if ( member.equals(getChannel().getLocalMember(false)) ) return;
+ if ( member.equals(getChannel().getLocalMember(false)) ) return;
+ boolean memberAdded = false;
//select a backup node if we don't have one
synchronized (mapMembers) {
- if (!mapMembers.contains(member) ) mapMembers.add(member);
+ if (!mapMembers.contains(member) ) {
+ mapMembers.add(member);
+ memberAdded = true;
+ }
}
- synchronized (stateMutex) {
- Iterator i = super.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry e = (Map.Entry) i.next();
- MapEntry entry = (MapEntry) e.getValue();
- if ( entry == null ) continue;
- if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) {
- try {
- Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
- entry.setBackupNodes(backup);
- } catch (ChannelException x) {
- log.error("Unable to select backup node.", x);
- } //catch
- } //end if
- } //while
- } //synchronized
-
+ if ( memberAdded ) {
+ printMap("BEFORE MEMBER ADD");
+ synchronized (stateMutex) {
+ Iterator i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ MapEntry entry = (MapEntry) e.getValue();
+ if ( entry == null ) continue;
+ if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) {
+ try {
+ Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
+ entry.setBackupNodes(backup);
+ } catch (ChannelException x) {
+ log.error("Unable to select backup node.", x);
+ } //catch
+ } //end if
+ } //while
+ } //synchronized
+ printMap("AFTER MEMBER ADD");
+ }//end if
}
public boolean inSet(Member m, Member[] set) {
@@ -471,16 +478,27 @@
return result;
}
+ public Member[] excludeFromSet(Member[] mbrs, Member[] set) {
+ ArrayList result = new ArrayList();
+ for (int i=0; i<set.length; i++ ) {
+ boolean include = true;
+ for (int j=0; j<mbrs.length; j++ )
+ if ( mbrs[j].equals(set[i]) ) include = false;
+ if ( include ) result.add(set[i]);
+ }
+ return (Member[])result.toArray(new Member[result.size()]);
+ }
+
public void memberAdded(Member member) {
//do nothing
}
public void memberDisappeared(Member member) {
+ printMap("BEFORE MEMBER GONE");
+ boolean removed = false;
synchronized (mapMembers) {
- mapMembers.remove(member);
+ removed = mapMembers.remove(member);
}
- //todo move all sessions that are primary here to and have this member as
- //a backup
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
Map.Entry e = (Map.Entry) i.next();
@@ -494,6 +512,7 @@
}
} //end if
} //while
+ printMap("AFTER MEMBER GONE");
}
int currentNode = 0;
@@ -547,8 +566,9 @@
private Serializable value;
public MapEntry(Serializable key, Serializable value) {
- this.key = key;
- this.value = value;
+ setKey(key);
+ setValue(value);
+
}
public boolean isBackup() {
@@ -591,11 +611,21 @@
public Object setValue(Object value) {
Object old = this.value;
this.value = (Serializable) value;
+ if ( value==null ) {
+ Exception x = new Exception(this.toString());
+ x.printStackTrace();
+ }
return old;
}
public Object getKey() {
return key;
+ }
+
+ public Object setKey(Object key) {
+ Object old = this.key;
+ this.key = (Serializable)key;
+ return old;
}
public int hashCode() {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Thu Mar 23 00:12:41 2006
@@ -125,27 +125,33 @@
*/
protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException {
//select a backup node
- Member backup = getNextBackupNode();
+ Member next = getNextBackupNode();
- if ( backup == null ) return null;
+ if ( next == null ) return null;
+ Member[] backup = wrap(next);
+ MapMessage msg = null;
//publish the data out to all nodes
- MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false,
- (Serializable) key, null, null, wrap(backup));
- getChannel().send(getMapMembers(), msg,getChannelSendOptions());
-
+ Member[] proxies = excludeFromSet(backup,getMapMembers());
+ if ( proxies.length > 0 ) {
+ msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false,
+ (Serializable) key, null, null, backup);
+ getChannel().send(proxies, msg, getChannelSendOptions());
+ }
//publish the backup data to one node
msg = new MapMessage(getMapContextName(), MapMessage.MSG_BACKUP, false,
- (Serializable) key, (Serializable) value, null, wrap(backup));
- getChannel().send(new Member[] {backup}, msg, getChannelSendOptions());
- return wrap(backup);
+ (Serializable) key, (Serializable) value, null, backup);
+ getChannel().send(backup, msg, getChannelSendOptions());
+ return backup;
}
public Object get(Object key) {
MapEntry entry = (MapEntry)super.get(key);
+ if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" entry:"+entry);
System.out.println("Requesting id:"+key+" entry:"+entry);
if ( entry == null ) return null;
if ( !entry.isPrimary() ) {
+ if ( entry.isProxy() ) System.out.println("PROXY:Requesting id:"+key+" entry:"+entry);
//if the message is not primary, we need to retrieve the latest value
try {
Member[] backup = null;
@@ -188,7 +194,7 @@
return null;
}
}
- System.out.println("Requesting id:"+key+" result:"+entry.getValue());
+ if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" result:"+entry.getValue());
return entry.getValue();
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java Thu Mar 23 00:12:41 2006
@@ -101,7 +101,6 @@
return collector.getResponses();
}
-
public void messageReceived(Serializable msg, Member sender) {
RpcMessage rmsg = (RpcMessage)msg;
RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid);
Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java Thu Mar 23 00:12:41 2006
@@ -145,7 +145,7 @@
Thread.sleep(pause);
}
} catch (ChannelException x) {
- log.error("Unable to send message:"+x.getMessage());
+ log.error("Unable to send message:"+x.getMessage(),x);
Member[] faulty = x.getFaultyMembers();
for (int i=0; i<faulty.length; i++ ) log.error("Faulty: "+faulty[i]);
--counter;
Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java Thu Mar 23 00:12:41 2006
@@ -49,9 +49,9 @@
table = SimpleTableDemo.createAndShowGUI(map,channel.getLocalMember(false).getName());
channel.addChannelListener(this);
channel.addMembershipListener(this);
- for ( int i=0; i<1000; i++ ) {
- map.put("MyKey-"+i,"My String Value-"+i);
- }
+// for ( int i=0; i<1000; i++ ) {
+// map.put("MyKey-"+i,"My String Value-"+i);
+// }
this.messageReceived(null,null);
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=388098&r1=388097&r2=388098&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Thu Mar 23 00:12:41 2006
@@ -27,6 +27,11 @@
8. Future version will have WAN membership and replication
+Bugs:
+===========================================
+ a) Somehow the first NIO connection made, always closes down, why
+ b) State synchronization for the map
+
Code Tasks:
===========================================
32. Replicated JNDI entries in Tomcat in the format
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org