You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/05/18 23:12:24 UTC
svn commit: r407634 - in /tomcat/container/tc5.5.x/modules/groupcom:
src/share/org/apache/catalina/tribes/group/
src/share/org/apache/catalina/tribes/group/interceptors/
src/share/org/apache/catalina/tribes/io/
src/share/org/apache/catalina/tribes/tran...
Author: fhanik
Date: Thu May 18 14:12:23 2006
New Revision: 407634
URL: http://svn.apache.org/viewvc?rev=407634&view=rev
Log:
The throughput interceptor now measures throughput correctly and taking both multithreading and multiple destinations into account.
Implemented a pool of buffers used by the group to send data down the stack
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool15Impl.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=407634&r1=407633&r2=407634&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Thu May 18 14:12:23 2006
@@ -38,6 +38,7 @@
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.UniqueId;
import org.apache.catalina.tribes.Heartbeat;
+import org.apache.catalina.tribes.io.BufferPool;
/**
* The default implementation of a Channel.<br>
@@ -184,6 +185,7 @@
*/
public UniqueId send(Member[] destination, Serializable msg, int options, ErrorHandler handler) throws ChannelException {
if ( msg == null ) throw new ChannelException("Cant send a NULL message");
+ XByteBuffer buffer = null;
try {
if ( destination == null || destination.length == 0) throw new ChannelException("No destination given");
ChannelData data = new ChannelData(true);//generates a unique Id
@@ -198,7 +200,8 @@
options = options & (~SEND_OPTIONS_BYTE_MESSAGE);
}
data.setOptions(options);
- XByteBuffer buffer = new XByteBuffer(b.length+128,false);
+ //XByteBuffer buffer = new XByteBuffer(b.length+128,false);
+ buffer = BufferPool.getBufferPool().getBuffer(b.length+128, false);
buffer.append(b,0,b.length);
data.setMessage(buffer);
InterceptorPayload payload = null;
@@ -211,6 +214,8 @@
}catch ( Exception x ) {
if ( x instanceof ChannelException ) throw (ChannelException)x;
throw new ChannelException(x);
+ } finally {
+ if ( buffer != null ) BufferPool.getBufferPool().returnBuffer(buffer);
}
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java?rev=407634&r1=407633&r2=407634&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java Thu May 18 14:12:23 2006
@@ -44,7 +44,7 @@
private boolean run = false;
private Thread msgDispatchThread = null;
protected long currentSize = 0;
- private boolean useDeepClone = false;
+ private boolean useDeepClone = true;
public MessageDispatchInterceptor() {
setOptionFlag(Channel.SEND_OPTIONS_ASYNCHRONOUS);
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java?rev=407634&r1=407633&r2=407634&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java Thu May 18 14:12:23 2006
@@ -24,6 +24,8 @@
import org.apache.catalina.tribes.io.XByteBuffer;
import java.text.DecimalFormat;
import org.apache.catalina.tribes.membership.MemberImpl;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
@@ -34,35 +36,56 @@
* @version 1.0
*/
public class ThroughputInterceptor extends ChannelInterceptorBase {
-
+ protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ThroughputInterceptor.class);
+
double mbTx = 0;
double mbRx = 0;
double timeTx = 0;
- long msgTxCnt = 1;
- long msgRxCnt = 1;
+ AtomicLong msgTxCnt = new AtomicLong(1);
+ AtomicLong msgRxCnt = new AtomicLong(1);
+ AtomicLong msgTxErr = new AtomicLong(1);
int interval = 10000;
- DecimalFormat df = new DecimalFormat("##.00");
- int addrlength = 0;
+ AtomicInteger access = new AtomicInteger(0);
+ long start = 0;
+ DecimalFormat df = new DecimalFormat("#0.00");
+
public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
+ if ( access.addAndGet(1) == 1 ) start = System.currentTimeMillis();
long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackageLength());
- long start = System.currentTimeMillis();
- super.sendMessage(destination,msg,payload);
- long stop = System.currentTimeMillis();
- timeTx+= ((double)(stop-start))/1000d;
- mbTx += ((double)bytes)/(1024d*1024d);
- if ( msgTxCnt % interval == 0 ) report();
- msgTxCnt++;
+ try {
+ super.sendMessage(destination, msg, payload);
+ }catch ( ChannelException x ) {
+ msgTxErr.addAndGet(1);
+ access.addAndGet(-1);
+ throw x;
+ }
+ mbTx += ((double)(bytes*destination.length))/(1024d*1024d);
+ if ( access.addAndGet(-1) == 0 ) {
+ long stop = System.currentTimeMillis();
+ timeTx += ( (double) (stop - start)) / 1000d;
+ }
+
+ if (msgTxCnt.get() % interval == 0) {
+ double time = timeTx;
+
+ if ( access.get() != 0 ) {
+ long now = System.currentTimeMillis();
+ time = (double)(now - start + timeTx)/1000d;
+ }
+ report(time);
+ }
+ msgTxCnt.addAndGet(1);
}
public void messageReceived(ChannelMessage msg) {
long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackage().length);
mbRx += ((double)bytes)/(1024d*1024d);
- if ( msgRxCnt % interval == 0 ) report();
- msgRxCnt++;
+ if ( msgRxCnt.get() % interval == 0 ) report(timeTx);
+ msgRxCnt.addAndGet(1);
}
- public void report() {
+ public void report(double timeTx) {
StringBuffer buf = new StringBuffer("ThroughputInterceptor Report[\n\tTx Msg:");
buf.append(msgTxCnt).append(" messages\n\tSent:");
buf.append(df.format(mbTx));
@@ -70,18 +93,14 @@
buf.append(df.format(timeTx));
buf.append(" seconds\n\tSpeed:");
buf.append(df.format(mbTx/timeTx));
- buf.append(" MB/sec\n\tRx Msg:");
+ buf.append(" MB/sec\n\tError Msg:");
+ buf.append(msgTxErr).append("\n\tRx Msg:");
buf.append(msgRxCnt);
buf.append(" messages\n\tReceived:");
buf.append(df.format(mbRx)).append(" MB]\n");
- System.out.println(buf);
+ if ( log.isInfoEnabled() ) log.info(buf);
}
- public void start(int svc) throws ChannelException{
- super.start(svc);
- addrlength = ((MemberImpl)getLocalMember(true)).getData().length;
- }
-
public void setInterval(int interval) {
this.interval = interval;
}
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java?rev=407634&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java Thu May 18 14:12:23 2006
@@ -0,0 +1,87 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.io;
+
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.commons.logging.Log;
+
+/**
+ *
+ * @author Filip Hanik
+ *
+ * @version 1.0
+ */
+public class BufferPool {
+ protected static Log log = LogFactory.getLog(BufferPool.class);
+
+ public static int DEFAULT_POOL_SIZE = 100*1024*1024; //100MB
+
+
+
+ protected static BufferPool instance = null;
+ protected BufferPoolAPI pool = null;
+
+ private BufferPool(BufferPoolAPI pool) {
+ this.pool = pool;
+ }
+
+ public XByteBuffer getBuffer(int minSize, boolean discard) {
+ if ( pool != null ) return pool.getBuffer(minSize, discard);
+ else return new XByteBuffer(minSize,discard);
+ }
+
+ public void returnBuffer(XByteBuffer buffer) {
+ if ( pool != null ) pool.returnBuffer(buffer);
+ }
+
+ public void clear() {
+ if ( pool != null ) pool.clear();
+ }
+
+
+ public static BufferPool getBufferPool() {
+ if ( (instance == null) ) {
+ synchronized (BufferPool.class) {
+ if ( instance == null ) {
+ BufferPoolAPI pool = null;
+ try {
+ Class clazz = Class.forName("org.apache.catalina.tribes.io.BufferPool15Impl");
+ pool = (BufferPoolAPI)clazz.newInstance();
+ pool.setMaxSize(DEFAULT_POOL_SIZE);
+ log.info("Created a buffer pool with max size:"+DEFAULT_POOL_SIZE+" bytes.");
+ } catch ( Exception x ) {
+ log.warn("Unable to initilize BufferPool, not pooling XByteBuffer objects.",x);
+ }
+ instance = new BufferPool(pool);
+
+ }//end if
+ }//sync
+ }//end if
+ return instance;
+ }
+
+
+ public static interface BufferPoolAPI {
+ public void setMaxSize(int bytes);
+
+ public XByteBuffer getBuffer(int minSize, boolean discard);
+
+ public void returnBuffer(XByteBuffer buffer);
+
+ public void clear();
+ }
+}
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool15Impl.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool15Impl.java?rev=407634&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool15Impl.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool15Impl.java Thu May 18 14:12:23 2006
@@ -0,0 +1,62 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.io;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ */
+class BufferPool15Impl implements BufferPool.BufferPoolAPI {
+ protected int maxSize;
+ protected AtomicInteger size = new AtomicInteger(0);
+ protected ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
+
+ public void setMaxSize(int bytes) {
+ this.maxSize = bytes;
+ }
+
+
+ public XByteBuffer getBuffer(int minSize, boolean discard) {
+ XByteBuffer buffer = (XByteBuffer)queue.poll();
+ if ( buffer != null ) size.addAndGet(-buffer.getCapacity());
+ if ( buffer == null ) buffer = new XByteBuffer(minSize,discard);
+ else if ( buffer.getCapacity() <= minSize ) buffer.expand(minSize);
+ buffer.setDiscard(discard);
+ buffer.reset();
+ return buffer;
+ }
+
+ public void returnBuffer(XByteBuffer buffer) {
+ if ( (size.get() + buffer.getCapacity()) <= maxSize ) {
+ size.addAndGet(buffer.getCapacity());
+ queue.offer(buffer);
+ }
+ }
+
+ public void clear() {
+ queue.clear();
+ size.set(0);
+ }
+
+ public int getMaxSize() {
+ return maxSize;
+ }
+
+}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=407634&r1=407633&r2=407634&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Thu May 18 14:12:23 2006
@@ -20,10 +20,10 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
-import java.io.ObjectInputStream;
/**
* The XByteBuffer provides a dual functionality.
@@ -119,6 +119,10 @@
throw new ArrayIndexOutOfBoundsException("Can't trim more bytes than are available. length:"+bufSize+" trim:"+length);
bufSize -= length;
}
+
+ public void reset() {
+ bufSize = 0;
+ }
public byte[] getBytesDirect() {
return this.buf;
@@ -571,5 +575,12 @@
return data;
}
-
+ public void setDiscard(boolean discard) {
+ this.discard = discard;
+ }
+
+ public boolean getDiscard() {
+ return discard;
+ }
+
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=407634&r1=407633&r2=407634&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java Thu May 18 14:12:23 2006
@@ -80,7 +80,6 @@
public boolean process(SelectionKey key, boolean waitForAck) throws IOException {
int ops = key.readyOps();
key.interestOps(key.interestOps() & ~ops);
-
//in case disconnect has been called
if ((!isConnected()) && (!connecting)) throw new IOException("Sender has been disconnected, can't selection key.");
if ( !key.isValid() ) throw new IOException("Key is not valid, it must have been cancelled.");
Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=407634&r1=407633&r2=407634&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java Thu May 18 14:12:23 2006
@@ -31,6 +31,7 @@
import org.apache.catalina.tribes.transport.ReplicationTransmitter;
import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor;
import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
+import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
/**
* <p>Title: </p>
@@ -68,6 +69,7 @@
.append("\n\t\t[-frag]")
.append("\n\t\t[-fragsize maxmsgsize]")
.append("\n\t\t[-throughput]")
+ .append("\n\t\t[-failuredetect]")
.append("\n\t\t[-async]")
.append("\n\t\t[-asyncsize maxqueuesizeinbytes]");
return buf;
@@ -97,6 +99,7 @@
boolean async = false;
int asyncsize = 1024*1024*50; //50MB
boolean throughput = false;
+ boolean failuredetect = false;
for (int i = 0; i < args.length; i++) {
if ("-bind".equals(args[i])) {
@@ -113,6 +116,8 @@
gzip = true;
} else if ("-async".equals(args[i])) {
async = true;
+ } else if ("-failuredetect".equals(args[i])) {
+ failuredetect = true;
} else if ("-asyncsize".equals(args[i])) {
asyncsize = Integer.parseInt(args[++i]);
System.out.println("Setting MessageDispatchInterceptor.maxQueueSize="+asyncsize);
@@ -209,6 +214,11 @@
mi.setMaxQueueSize(asyncsize);
channel.addInterceptor(mi);
System.out.println("Added MessageDispatchInterceptor");
+ }
+
+ if ( failuredetect ) {
+ TcpFailureDetector tcpfi = new TcpFailureDetector();
+ channel.addInterceptor(tcpfi);
}
return channel;
Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java?rev=407634&r1=407633&r2=407634&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java Thu May 18 14:12:23 2006
@@ -390,6 +390,7 @@
t.start();
threads--;
test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
+ test.channelOptions = channelOptions;
}
test.run();
if ( shutdown && send ) channel.stop(channel.DEFAULT);
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org