You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/03 01:04:22 UTC
svn commit: r382577 - in
/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes:
mcast/McastMember.java tcp/ReplicationTransmitter.java
tcp/nio/NioSender.java tcp/nio/ParallelNioSender.java
tcp/nio/PooledParallelSender.java
Author: fhanik
Date: Thu Mar 2 16:04:20 2006
New Revision: 382577
URL: http://svn.apache.org/viewcvs?rev=382577&view=rev
Log:
First version of the parallel sender is complete
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java?rev=382577&r1=382576&r2=382577&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java Thu Mar 2 16:04:20 2006
@@ -181,7 +181,6 @@
System.arraycopy(domaind,0,data,20,domaind.length);
dataPkg = data;
- System.out.println("McastMember.getData all the way");
return data;
}
/**
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=382577&r1=382576&r2=382577&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java Thu Mar 2 16:04:20 2006
@@ -29,6 +29,7 @@
import org.apache.catalina.tribes.util.IDynamicProperty;
import org.apache.catalina.util.StringManager;
import org.apache.tomcat.util.IntrospectionUtils;
+import org.apache.catalina.tribes.tcp.nio.PooledParallelSender;
/**
* Transmit message to other cluster members
@@ -154,7 +155,7 @@
/**
* @return The ack timeout
*/
- public long getTimeout() {
+ public long getAckTimeout() {
return ackTimeout;
}
@@ -267,17 +268,38 @@
* @see org.apache.catalina.tribes.ClusterSender#sendMessage(org.apache.catalina.tribes.ClusterMessage, org.apache.catalina.tribes.Member)
*/
public void sendMessage(ChannelMessage message, Member[] destination) throws ChannelException {
- ChannelException exception = null;
- for (int i = 0; i < destination.length; i++) {
- try {
- sendMessage(message, destination[i]);
- } catch (Exception x) {
- if (exception == null) exception = new ChannelException(x);
- exception.addFaultyMember(destination[i]);
+ if ( !isParallel() ) {
+ ChannelException exception = null;
+ for (int i = 0; i < destination.length; i++) {
+ try {
+ sendMessage(message, destination[i]);
+ } catch (Exception x) {
+ if (exception == null) exception = new ChannelException(x);
+ exception.addFaultyMember(destination[i]);
+ }
}
+ if (exception != null)throw exception;
+ } else {
+ MultiPointSender sender = getParallelSender();
+ sender.sendMessage(destination,message);
}
- if (exception != null)throw exception;
+ }
+
+ PooledParallelSender parallelsender = null;
+ public MultiPointSender getParallelSender() {
+ if ( parallelsender == null ) {
+
+ PooledParallelSender sender = new PooledParallelSender();
+ sender.setMaxRetryAttempts(2);
+ sender.setRxBufSize(getRxBufSize());
+ sender.setTimeout(ackTimeout);
+ sender.setUseDirectBuffer(true);
+ sender.setWaitForAck(getWaitForAck());
+ sender.setTxBufSize(getTxBufSize());
+ parallelsender = sender;
+ }
+ return parallelsender;
}
public void sendMessage(ChannelMessage message, Member destination) throws ChannelException {
@@ -365,14 +387,16 @@
*/
public synchronized void add(Member member) {
try {
- Object key = getKey(member);
- if (!map.containsKey(key)) {
- SinglePointSender sender = DataSenderFactory.getSingleSender(replicationMode, member);
- if ( sender!= null ) {
- transferSenderProperty(sender);
- sender.setRxBufSize(getRxBufSize());
- sender.setTxBufSize(getTxBufSize());
- map.put(key, sender);
+ if ( !isParallel() ) {
+ Object key = getKey(member);
+ if (!map.containsKey(key)) {
+ SinglePointSender sender = DataSenderFactory.getSingleSender(replicationMode, member);
+ if (sender != null) {
+ transferSenderProperty(sender);
+ sender.setRxBufSize(getRxBufSize());
+ sender.setTxBufSize(getTxBufSize());
+ map.put(key, sender);
+ }
}
}
} catch (java.io.IOException x) {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java?rev=382577&r1=382576&r2=382577&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java Thu Mar 2 16:04:20 2006
@@ -189,7 +189,6 @@
socketChannel.configureBlocking(false);
socketChannel.connect(addr);
socketChannel.register(getSelector(),SelectionKey.OP_CONNECT,this);
- this.connected = true;
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=382577&r1=382576&r2=382577&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java Thu Mar 2 16:04:20 2006
@@ -77,6 +77,7 @@
NioSender[] senders = setupForSend(destination);
connect(senders);
setData(senders,data);
+
int remaining = senders.length;
try {
//loop until complete, an error happens, or we timeout
@@ -103,9 +104,11 @@
private int doLoop(long selectTimeOut, int maxAttempts) throws IOException, ChannelException {
int completed = 0;
int selectedKeys = selector.select(selectTimeOut);
+
if (selectedKeys == 0) {
return 0;
}
+
Iterator it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey sk = (SelectionKey) it.next();
@@ -261,7 +264,8 @@
}
public boolean checkKeepAlive() {
- throw new UnsupportedOperationException("Method ParallelNioSender.checkKeepAlive() not implemented");
+ //throw new UnsupportedOperationException("Method ParallelNioSender.checkKeepAlive() not implemented");
+ return false;
}
}
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java?rev=382577&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java Thu Mar 2 16:04:20 2006
@@ -0,0 +1,78 @@
+package org.apache.catalina.tribes.tcp.nio;
+
+import org.apache.catalina.tribes.tcp.PooledSender;
+import org.apache.catalina.tribes.tcp.DataSender;
+import org.apache.catalina.tribes.tcp.MultiPointSender;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
+import java.io.IOException;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class PooledParallelSender extends PooledSender implements MultiPointSender{
+ private boolean suspect;
+ private boolean useDirectBuffer;
+ private int maxRetryAttempts;
+
+ public PooledParallelSender() {
+ super(25);
+ }
+ public void sendMessage(Member[] destination, ChannelMessage message) throws ChannelException {
+ ParallelNioSender sender = (ParallelNioSender)getSender();
+ try {
+ sender.sendMessage(destination, message);
+ }finally {
+ returnSender(sender);
+ }
+ }
+
+ public DataSender getNewDataSender() {
+ try {
+ ParallelNioSender sender =
+ new ParallelNioSender(getTimeout(),
+ getWaitForAck(),
+ getMaxRetryAttempts(),
+ useDirectBuffer,
+ getRxBufSize(),
+ getTxBufSize());
+ return sender;
+ } catch ( IOException x ) {
+ throw new IllegalStateException("Unable to open NIO selector.",x);
+ }
+ }
+
+ public void setSuspect(boolean suspect) {
+ this.suspect = suspect;
+ }
+
+ public void setUseDirectBuffer(boolean useDirectBuffer) {
+ this.useDirectBuffer = useDirectBuffer;
+ }
+
+ public void setMaxRetryAttempts(int maxRetryAttempts) {
+ this.maxRetryAttempts = maxRetryAttempts;
+ }
+
+ public boolean getSuspect() {
+ return suspect;
+ }
+
+ public boolean getUseDirectBuffer() {
+ return useDirectBuffer;
+ }
+
+ public int getMaxRetryAttempts() {
+ return maxRetryAttempts;
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org