You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/05/02 19:28:14 UTC
svn commit: r398970 - in /tomcat/container/tc5.5.x/modules/groupcom: ./
src/share/org/apache/catalina/tribes/
src/share/org/apache/catalina/tribes/membership/
src/share/org/apache/catalina/tribes/transport/
src/share/org/apache/catalina/tribes/transpor...
Author: fhanik
Date: Tue May 2 10:28:06 2006
New Revision: 398970
URL: http://svn.apache.org/viewcvs?rev=398970&view=rev
Log:
Fix shutdown message, wasnt getting broadcasted since it caches the serialized data for speed purposes
Started implementing the fail ack command so that we can have a acknowledgement if the message was handled successfully or not
Refactored channel exception so that we can track what exact exception a member send was exposed to
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/RemoteProcessException.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelException.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/Constants.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelException.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelException.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelException.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelException.java Tue May 2 10:28:06 2006
@@ -57,29 +57,45 @@
return buf.toString();
}
- public void addFaultyMember(Member[] mbrs) {
+ public void addFaultyMember(Member mbr, Exception x ) {
+ addFaultyMember(new FaultyMember(mbr,x));
+ }
+
+ public void addFaultyMember(FaultyMember[] mbrs) {
for (int i=0; mbrs!=null && i<mbrs.length; i++ ) {
addFaultyMember(mbrs[i]);
}
}
- public void addFaultyMember(Member mbr) {
+ public void addFaultyMember(FaultyMember mbr) {
if ( this.faultyMembers==null ) this.faultyMembers = new ArrayList();
faultyMembers.add(mbr);
}
- public void setFaultyMembers(ArrayList faultyMembers) {
- this.faultyMembers = faultyMembers;
+ public FaultyMember[] getFaultyMembers() {
+ if ( this.faultyMembers==null ) return new FaultyMember[0];
+ return (FaultyMember[])faultyMembers.toArray(new FaultyMember[faultyMembers.size()]);
}
-
- public void setFaultyMembers(Member[] faultyMembers) {
- if ( this.faultyMembers==null ) this.faultyMembers = new ArrayList();
- this.faultyMembers.addAll(Arrays.asList(faultyMembers));
- }
-
- public Member[] getFaultyMembers() {
- if ( this.faultyMembers==null ) return new Member[0];
- return (Member[])faultyMembers.toArray(new Member[faultyMembers.size()]);
+
+ public static class FaultyMember {
+ protected Exception cause;
+ protected Member member;
+ public FaultyMember(Member mbr, Exception x) {
+ this.member = mbr;
+ this.cause = x;
+ }
+
+ public Member getMember() {
+ return member;
+ }
+
+ public Exception getCause() {
+ return cause;
+ }
+
+ public String toString() {
+ return "FaultyMember:"+member.toString();
+ }
}
}
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/RemoteProcessException.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/RemoteProcessException.java?rev=398970&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/RemoteProcessException.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/RemoteProcessException.java Tue May 2 10:28:06 2006
@@ -0,0 +1,46 @@
+/*
+ * Copyright 1999,2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.catalina.tribes;
+
+/**
+ * <p>Title: RemoteProcessException</p>
+ *
+ * <p>Description: Message thrown by a sender when USE_SYNC_ACK receives a FAIL_ACK_COMMAND.<br>
+ * This means that the message was received on the remote node but the processing of the message failed.
+ * </p>
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class RemoteProcessException
+ extends RuntimeException {
+ public RemoteProcessException() {
+ super();
+ }
+
+ public RemoteProcessException(String message) {
+ super(message);
+ }
+
+ public RemoteProcessException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public RemoteProcessException(Throwable cause) {
+ super(cause);
+ }
+
+}
\ No newline at end of file
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java Tue May 2 10:28:06 2006
@@ -214,9 +214,12 @@
//send a stop message
byte[] payload = member.getPayload();
member.setPayload(STOP_PAYLOAD);
+ member.getData(true,true);
send();
//restore payload
member.setPayload(payload);
+ member.getData(true,true);
+ //leave mcast group
socket.leaveGroup(address);
serviceStartTime = Long.MAX_VALUE;
}
@@ -236,7 +239,7 @@
log.debug("Mcast receive ping from member " + m);
if (Arrays.equals(m.getPayload(), STOP_PAYLOAD)) {
- if (log.isInfoEnabled()) log.info("Member has shutdown:" + m);
+ if (log.isDebugEnabled()) log.debug("Member has shutdown:" + m);
membership.removeMcastMember(m);
service.memberDisappeared(m);
} else if (membership.memberAlive(m)) {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java Tue May 2 10:28:06 2006
@@ -173,6 +173,11 @@
* @return byte[]
*/
public byte[] getData(boolean getalive) {
+ return getData(getalive,false);
+ }
+
+ public byte[] getData(boolean getalive, boolean reset) {
+ if ( reset ) dataPkg = null;
//look in cache first
if ( dataPkg!=null ) {
if ( getalive ) {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/Constants.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/Constants.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/Constants.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/Constants.java Tue May 2 10:28:06 2006
@@ -22,7 +22,7 @@
/**
* Manifest constants for the <code>org.apache.catalina.tribes.transport</code>
* package.
- *
+ * @author Filip Hanik
* @author Peter Rossbach
* @version $Revision: 303753 $ $Date: 2005-03-14 15:24:30 -0600 (Mon, 14 Mar 2005) $
*/
@@ -31,7 +31,12 @@
public static final String Package = "org.apache.catalina.tribes.transport";
+ /*
+ * Do not change any of these values!
+ */
public static final byte[] ACK_DATA = new byte[] {6, 2, 3};
+ public static final byte[] FAIL_ACK_DATA = new byte[] {11, 0, 5};
public static final byte[] ACK_COMMAND = XByteBuffer.createDataPackage(ACK_DATA);
+ public static final byte[] FAIL_ACK_COMMAND = XByteBuffer.createDataPackage(FAIL_ACK_DATA);
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java Tue May 2 10:28:06 2006
@@ -102,15 +102,20 @@
* server before completing the request
* This is considered an asynchronized request
*/
- if (ClusterData.sendAckAsync(msgs[i].getOptions())) sendAck();
- //process the message
- getCallback().messageDataReceived(msgs[i]);
- /**
- * Use send ack here if you want the request to complete on this
- * server before sending the ack to the remote server
- * This is considered a synchronized request
- */
- if (ClusterData.sendAckSync(msgs[i].getOptions())) sendAck();
+ if (ClusterData.sendAckAsync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
+ try {
+ //process the message
+ getCallback().messageDataReceived(msgs[i]);
+ /**
+ * Use send ack here if you want the request to complete on this
+ * server before sending the ack to the remote server
+ * This is considered a synchronized request
+ */
+ if (ClusterData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
+ }catch ( Exception x ) {
+ if (ClusterData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.FAIL_ACK_COMMAND);
+ log.error("Error thrown from messageDataReceived.",x);
+ }
}
}
@@ -145,10 +150,10 @@
* @param key
* @param channel
*/
- protected void sendAck() {
+ protected void sendAck(byte[] command) {
try {
OutputStream out = socket.getOutputStream();
- out.write(Constants.ACK_COMMAND);
+ out.write(command);
out.flush();
if (log.isTraceEnabled()) {
log.trace("ACK sent to " + socket.getPort());
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java Tue May 2 10:28:06 2006
@@ -32,6 +32,7 @@
import org.apache.catalina.tribes.transport.SenderState;
import org.apache.catalina.tribes.util.StringManager;
import org.apache.catalina.tribes.transport.AbstractSender;
+import org.apache.catalina.tribes.RemoteProcessException;
/**
* Send cluster messages with only one socket. Ack and keep Alive Handling is
@@ -272,6 +273,7 @@
protected void waitForAck() throws java.io.IOException {
try {
boolean ackReceived = false;
+ boolean failAckReceived = false;
ackbuf.clear();
int bytesRead = 0;
int i = soIn.read();
@@ -280,7 +282,10 @@
byte d = (byte)i;
ackbuf.append(d);
if (ackbuf.doesPackageExist() ) {
- ackReceived = Arrays.equals(ackbuf.extractDataPackage(true),Constants.ACK_DATA);
+ byte[] ackcmd = ackbuf.extractDataPackage(true);
+ ackReceived = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.ACK_DATA);
+ failAckReceived = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA);
+ ackReceived = ackReceived || failAckReceived;
break;
}
i = soIn.read();
@@ -288,6 +293,8 @@
if (!ackReceived) {
if (i == -1) throw new IOException(sm.getString("IDataSender.ack.eof",getAddress(), new Integer(socket.getLocalPort())));
else throw new IOException(sm.getString("IDataSender.ack.wrong",getAddress(), new Integer(socket.getLocalPort())));
+ } else if ( failAckReceived ) {
+ throw new RemoteProcessException("Received a failed ack:org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA");
}
} catch (IOException x) {
String errmsg = sm.getString("IDataSender.ack.missing", getAddress(),new Integer(socket.getLocalPort()), new Long(getTimeout()));
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java Tue May 2 10:28:06 2006
@@ -34,7 +34,6 @@
private boolean autoConnect;
public synchronized void sendMessage(Member[] destination, ChannelMessage msg) throws ChannelException {
- long start = System.currentTimeMillis();
byte[] data = XByteBuffer.createDataPackage((ClusterData)msg);
BioSender[] senders = setupForSend(destination);
ChannelException cx = null;
@@ -43,7 +42,7 @@
senders[i].sendMessage(data,(msg.getOptions()&Channel.SEND_OPTIONS_USE_ACK)==Channel.SEND_OPTIONS_USE_ACK);
} catch (Exception x) {
if (cx == null) cx = new ChannelException(x);
- cx.addFaultyMember(destination[i]);
+ cx.addFaultyMember(destination[i],x);
}
}
if (cx!=null ) throw cx;
@@ -71,7 +70,7 @@
result[i].keepalive();
}catch (Exception x ) {
if ( cx== null ) cx = new ChannelException(x);
- cx.addFaultyMember(destination[i]);
+ cx.addFaultyMember(destination[i],x);
}
}
if ( cx!=null ) throw cx;
@@ -94,7 +93,7 @@
sender.disconnect();
}catch ( Exception e ) {
if ( x == null ) x = new ChannelException(e);
- x.addFaultyMember(mbr);
+ x.addFaultyMember(mbr,e);
}
bioSenders.remove(mbr);
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java Tue May 2 10:28:06 2006
@@ -31,7 +31,7 @@
sender = (MultiPointSender)getSender();
if (sender == null) {
ChannelException cx = new ChannelException("Unable to retrieve a data sender, time out error.");
- for (int i = 0; i < destination.length; i++) cx.addFaultyMember(destination[i]);
+ for (int i = 0; i < destination.length; i++) cx.addFaultyMember(destination[i], new NullPointerException("Unable to retrieve a sender from the sender pool"));
throw cx;
} else {
sender.sendMessage(destination, msg);
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Tue May 2 10:28:06 2006
@@ -152,20 +152,20 @@
* server before completing the request
* This is considered an asynchronized request
*/
- if (ClusterData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel);
+ if (ClusterData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
try {
//process the message
getCallback().messageDataReceived(msgs[i]);
+ /**
+ * Use send ack here if you want the request to complete on this
+ * server before sending the ack to the remote server
+ * This is considered a synchronized request
+ */
+ if (ClusterData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
}catch ( Exception e ) {
log.error("Processing of cluster message failed.",e);
+ if (ClusterData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
}
- /**
- * Use send ack here if you want the request to complete on this
- * server before sending the ack to the remote server
- * This is considered a synchronized request
- */
-
- if (ClusterData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel);
}
}
@@ -200,10 +200,10 @@
* @param key
* @param channel
*/
- protected void sendAck(SelectionKey key, SocketChannel channel) {
+ protected void sendAck(SelectionKey key, SocketChannel channel, byte[] command) {
try {
- channel.write(ByteBuffer.wrap(Constants.ACK_COMMAND));
+ channel.write(ByteBuffer.wrap(command));
if (log.isTraceEnabled()) {
log.trace("ACK sent to " + channel.socket().getPort());
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java Tue May 2 10:28:06 2006
@@ -29,6 +29,7 @@
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.transport.AbstractSender;
import org.apache.catalina.tribes.transport.DataSender;
+import org.apache.catalina.tribes.RemoteProcessException;
/**
* This class is NOT thread safe and should never be used with more than one thread at a time
@@ -156,8 +157,11 @@
ackbuf.append(readbuf,read);
readbuf.clear();
if (ackbuf.doesPackageExist() ) {
- boolean result = Arrays.equals(ackbuf.extractDataPackage(true),org.apache.catalina.tribes.transport.Constants.ACK_DATA);
- return result;
+ byte[] ackcmd = ackbuf.extractDataPackage(true);
+ boolean ack = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.ACK_DATA);
+ boolean fack = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA);
+ if ( fack ) throw new RemoteProcessException("Received a failed ack:org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA");
+ return ack || fack;
} else {
return false;
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java Tue May 2 10:28:06 2006
@@ -93,7 +93,7 @@
//timeout has occured
cx = new ChannelException("Operation has timed out("+getTimeout()+" ms.).");
for (int i=0; i<senders.length; i++ ) {
- if (!senders[i].isComplete() ) cx.addFaultyMember(senders[i].getDestination());
+ if (!senders[i].isComplete() ) cx.addFaultyMember(senders[i].getDestination(),null);
}
throw cx;
}
@@ -145,7 +145,7 @@
if ( !isConnected() ) {
log.warn("Not retrying send for:" + sender.getDestination().getName() + "; Sender is disconnected.");
ChannelException cx = new ChannelException("Send failed, and sender is disconnected. Not retrying.",x);
- cx.addFaultyMember(sender.getDestination());
+ cx.addFaultyMember(sender.getDestination(),x);
throw cx;
}
@@ -161,7 +161,7 @@
}
} else {
ChannelException cx = new ChannelException("Send failed, attempt:"+sender.getAttempt()+" max:"+maxAttempts,x);
- cx.addFaultyMember(sender.getDestination());
+ cx.addFaultyMember(sender.getDestination(),x);
throw cx;
}//end if
}
@@ -177,7 +177,7 @@
if (!senders[i].isConnected()) senders[i].connect();
}catch ( IOException io ) {
if ( x==null ) x = new ChannelException(io);
- x.addFaultyMember(senders[i].getDestination());
+ x.addFaultyMember(senders[i].getDestination(),io);
}
}
if ( x != null ) throw x;
@@ -190,7 +190,7 @@
senders[i].setMessage(data);
}catch ( IOException io ) {
if ( x==null ) x = new ChannelException(io);
- x.addFaultyMember(senders[i].getDestination());
+ x.addFaultyMember(senders[i].getDestination(),io);
}
}
if ( x != null ) throw x;
@@ -208,7 +208,7 @@
nioSenders.put(destination[i], sender);
}catch ( UnknownHostException x ) {
if ( cx == null ) cx = new ChannelException("Unable to setup NioSender.",x);
- cx.addFaultyMember(destination[i]);
+ cx.addFaultyMember(destination[i],x);
}
}
if ( sender != null ) {
@@ -243,7 +243,7 @@
sender.disconnect();
}catch ( Exception e ) {
if ( x == null ) x = new ChannelException(e);
- x.addFaultyMember(mbr);
+ x.addFaultyMember(mbr,e);
}
nioSenders.remove(mbr);
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java Tue May 2 10:28:06 2006
@@ -149,7 +149,7 @@
} catch (ChannelException x) {
if ( debug ) log.error("Unable to send message:"+x.getMessage(),x);
log.error("Unable to send message:"+x.getMessage());
- Member[] faulty = x.getFaultyMembers();
+ ChannelException.FaultyMember[] faulty = x.getFaultyMembers();
for (int i=0; i<faulty.length; i++ ) log.error("Faulty: "+faulty[i]);
--counter;
if ( this.breakonChannelException ) throw x;
@@ -318,6 +318,7 @@
"[-size messagesize] \n\t\t"+
"[-sendoptions channeloptions] \n\t\t"+
"[-break (halts execution on exception)]\n"+
+ "[-shutdown (issues a channel.stop() command after send is completed)]\n"+
"\tChannel options:"+
ChannelCreator.usage()+"\n\n"+
"Example:\n\t"+
@@ -334,6 +335,7 @@
int stats = 10000;
boolean breakOnEx = false;
int threads = 1;
+ boolean shutdown = false;
int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
if ( args.length == 0 ) {
args = new String[] {"-help"};
@@ -348,6 +350,8 @@
pause = Long.parseLong(args[++i])*1000;
} else if ("-break".equals(args[i])) {
breakOnEx = true;
+ } else if ("-shutdown".equals(args[i])) {
+ shutdown = true;
} else if ("-stats".equals(args[i])) {
stats = Integer.parseInt(args[++i]);
System.out.println("Stats every "+stats+" message");
@@ -388,7 +392,7 @@
test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
}
test.run();
-
+ if ( shutdown && send ) channel.stop(channel.DEFAULT);
System.out.println("System test complete, sleeping to let threads finish.");
Thread.sleep(60*1000*60);
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Tue May 2 10:28:06 2006
@@ -48,8 +48,6 @@
41. Build a tipi that is a soft membership
-40. channel.stop() - should broadcast a stop message, to avoid timeout
-
38. Make the AbstractReplicatedMap accept non serializable elements, but just don't replicate them
36. UDP Sender and Receiver, initially without flow control and guaranteed delivery.
@@ -251,4 +249,7 @@
and no one accepts it, then it can reply immediately. this way the rpc sender doesn't have to time out.
39. Support for IPv6
-Notes: Completed. The membership now carries a variable length host address to support IPv6
\ No newline at end of file
+Notes: Completed. The membership now carries a variable length host address to support IPv6
+
+40. channel.stop() - should broadcast a stop message, to avoid timeout
+Notes: Completed.
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org