You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/14 06:44:48 UTC
svn commit: r385742 - in
/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes:
group/ group/interceptors/ io/ mcast/ tcp/nio/
Author: fhanik
Date: Mon Mar 13 21:44:46 2006
New Revision: 385742
URL: http://svn.apache.org/viewcvs?rev=385742&view=rev
Log:
Optimizing the interceptors
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastServiceImpl.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=385742&r1=385741&r2=385742&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java Mon Mar 13 21:44:46 2006
@@ -37,6 +37,7 @@
private ChannelReceiver clusterReceiver;
private ChannelSender clusterSender;
private MembershipService membershipService;
+ private boolean started = false;
public ChannelCoordinator() {
@@ -75,15 +76,18 @@
* SND_RX_SEQ - starts the replication receiver<BR>
* @throws ChannelException if a startup error occurs or the service is already started.
*/
- public void start(int svc) throws ChannelException {
+ public synchronized void start() throws ChannelException {
try {
+ if (started) return;
+ //must start the receiver first so that we can coordinate the port it
+ //listens to with the local membership settings
+ clusterReceiver.start();
+ clusterSender.start();
//synchronize, big time FIXME
membershipService.setLocalMemberProperties(getClusterReceiver().getHost(), getClusterReceiver().getPort());
- //end FIXME
- if ( (svc & Channel.SND_RX_SEQ) == Channel.SND_RX_SEQ) clusterReceiver.start();
- if ( (svc & Channel.SND_TX_SEQ) == Channel.SND_TX_SEQ) clusterSender.start();
- if ( (svc & Channel.MBR_RX_SEQ) == Channel.MBR_RX_SEQ) membershipService.start(MembershipService.MBR_RX);
- if ( (svc & Channel.MBR_TX_SEQ) == Channel.MBR_TX_SEQ) membershipService.start(MembershipService.MBR_TX);
+ membershipService.start(MembershipService.MBR_RX);
+ membershipService.start(MembershipService.MBR_TX);
+ this.started = true;
}catch ( ChannelException cx ) {
throw cx;
}catch ( Exception x ) {
@@ -102,14 +106,16 @@
* SND_RX_SEQ - starts the replication receiver<BR>
* @throws ChannelException if a startup error occurs or the service is already started.
*/
- public void stop(int svc) throws ChannelException {
+ public void stop() throws ChannelException {
try {
- if ( (svc & Channel.MBR_RX_SEQ) == Channel.MBR_RX_SEQ) membershipService.stop();
- if ( (svc & Channel.SND_RX_SEQ) == Channel.SND_RX_SEQ) clusterReceiver.stop();
- if ( (svc & Channel.SND_TX_SEQ) == Channel.SND_TX_SEQ) clusterSender.stop();
- if ( (svc & Channel.MBR_TX_SEQ) == Channel.MBR_RX_SEQ) membershipService.stop();
+ membershipService.stop();
+ clusterReceiver.stop();
+ clusterSender.stop();
+ membershipService.stop();
}catch ( Exception x ) {
throw new ChannelException(x);
+ } finally {
+ started = false;
}
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=385742&r1=385741&r2=385742&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Mon Mar 13 21:44:46 2006
@@ -173,7 +173,7 @@
* @throws ChannelException if a startup error occurs or the service is already started.
*/
public void start(int svc) throws ChannelException {
- coordinator.start(svc);
+ coordinator.start();
}
/**
@@ -188,7 +188,7 @@
* @throws ChannelException if a startup error occurs or the service is already started.
*/
public void stop(int svc) throws ChannelException {
- coordinator.stop(svc);
+ coordinator.stop();
}
public ChannelReceiver getChannelReceiver() {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?rev=385742&r1=385741&r2=385742&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java Mon Mar 13 21:44:46 2006
@@ -60,12 +60,14 @@
public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
for ( int i=0; i<destination.length; i++ ) {
- ChannelMessage tmp = msg.clone();
int nr = incCounter(destination[i]);
//reduce byte copy
- //tmp.getMessage().append(XByteBuffer.toBytes(nr),0,4);
- tmp.getMessage().append(nr);
- getNext().sendMessage(new Member[] {destination[i]}, tmp, payload);
+ msg.getMessage().append(nr);
+ try {
+ getNext().sendMessage(new Member[] {destination[i]}, msg, payload);
+ }finally {
+ msg.getMessage().trim(4);
+ }
}
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java?rev=385742&r1=385741&r2=385742&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java Mon Mar 13 21:44:46 2006
@@ -212,9 +212,20 @@
} else return false;
}
+ /**
+ * Create a shallow clone, only the data gets recreated
+ * @return ClusterData
+ */
public ClusterData clone() {
- byte[] d = this.getDataPackage();
- return ClusterData.getDataFromPackage(d);
+// byte[] d = this.getDataPackage();
+// return ClusterData.getDataFromPackage(d);
+ ClusterData clone = new ClusterData(false);
+ clone.options = this.options;
+ clone.message = new XByteBuffer(this.message.getBytesDirect(),false);
+ clone.timestamp = this.timestamp;
+ clone.uniqueId = this.uniqueId;
+ clone.address = this.address;
+ return clone;
}
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java?rev=385742&r1=385741&r2=385742&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java Mon Mar 13 21:44:46 2006
@@ -207,7 +207,7 @@
int dlen = XByteBuffer.toInt(dlend, 0);
byte[] domaind = new byte[dlen];
System.arraycopy(data, 20, domaind, 0, domaind.length);
- member.setDomain(new String(domaind));
+ member.domain = domaind;
member.setHost(addr);
member.setPort(XByteBuffer.toInt(portd, 0));
member.setMemberAliveTime(XByteBuffer.toLong(alived, 0));
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastServiceImpl.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastServiceImpl.java?rev=385742&r1=385741&r2=385742&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastServiceImpl.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastServiceImpl.java Mon Mar 13 21:44:46 2006
@@ -210,6 +210,7 @@
McastMember m = McastMember.getMember(data);
if(log.isDebugEnabled())
log.debug("Mcast receive ping from member " + m);
+
if ( membership.memberAlive(m) ) {
if(log.isDebugEnabled())
log.debug("Mcast add member " + m);
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java?rev=385742&r1=385741&r2=385742&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java Mon Mar 13 21:44:46 2006
@@ -64,6 +64,7 @@
private int tcpThreadCount;
private long tcpSelectorTimeout;
private Selector selector = null;
+ private ServerSocketChannel serverChannel = null;
private java.net.InetAddress bind;
private String tcpListenAddress;
@@ -135,6 +136,7 @@
}
try {
getBind();
+ this.bind();
Thread t = new Thread(this, "NioReceiver");
t.setDaemon(true);
t.start();
@@ -142,31 +144,63 @@
log.fatal("Unable to start cluster receiver", x);
}
}
-
+
/**
- * get data from channel and store in byte array
- * send it to cluster
+ * recursive bind to find the next available port
+ * @param socket ServerSocket
+ * @param portstart int
+ * @param retries int
+ * @return int
* @throws IOException
- * @throws java.nio.channels.ClosedChannelException
*/
- protected void listen() throws Exception {
- if (doListen) {
- log.warn("ServerSocketChannel allready started");
- return;
+ protected int bind(ServerSocket socket, int portstart, int retries) throws IOException {
+ while ( retries > 0 ) {
+ try {
+ InetSocketAddress addr = new InetSocketAddress(getBind(), portstart);
+ socket.bind(addr);
+ setTcpListenPort(portstart);
+ log.info("Nio Server Socket bound to:"+addr);
+ return 0;
+ }catch ( IOException x) {
+ retries--;
+ if ( retries <= 0 ) throw x;
+ portstart++;
+ retries = bind(socket,portstart,retries);
+ }
}
- doListen = true;
+ return retries;
+ }
+
+ protected void bind() throws IOException {
// allocate an unbound server socket channel
- ServerSocketChannel serverChannel = ServerSocketChannel.open();
+ serverChannel = ServerSocketChannel.open();
// Get the associated ServerSocket to bind it with
ServerSocket serverSocket = serverChannel.socket();
// create a new Selector for use below
selector = Selector.open();
// set the port the server channel will listen to
- serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort()));
+ //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort()));
+ bind(serverSocket,getTcpListenPort(),10);
// set non-blocking mode for the listening socket
serverChannel.configureBlocking(false);
// register the ServerSocketChannel with the Selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
+
+ }
+ /**
+ * get data from channel and store in byte array
+ * send it to cluster
+ * @throws IOException
+ * @throws java.nio.channels.ClosedChannelException
+ */
+ protected void listen() throws Exception {
+ if (doListen) {
+ log.warn("ServerSocketChannel allready started");
+ return;
+ }
+
+ doListen = true;
+
while (doListen && selector != null) {
// this may block for a long time, upon return the
// selected set contains keys of the ready channels
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org