You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/05/18 23:12:24 UTC

svn commit: r407634 - in /tomcat/container/tc5.5.x/modules/groupcom: src/share/org/apache/catalina/tribes/group/ src/share/org/apache/catalina/tribes/group/interceptors/ src/share/org/apache/catalina/tribes/io/ src/share/org/apache/catalina/tribes/tran...

Author: fhanik
Date: Thu May 18 14:12:23 2006
New Revision: 407634

URL: http://svn.apache.org/viewvc?rev=407634&view=rev
Log:
The throughput interceptor now measures throughput correctly and taking both multithreading and multiple destinations into account.
Implemented a pool of buffers used by the group to send data down the stack

Added:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool15Impl.java
Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java
    tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=407634&r1=407633&r2=407634&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Thu May 18 14:12:23 2006
@@ -38,6 +38,7 @@
 import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.catalina.tribes.UniqueId;
 import org.apache.catalina.tribes.Heartbeat;
+import org.apache.catalina.tribes.io.BufferPool;
 
 /**
  * The default implementation of a Channel.<br>
@@ -184,6 +185,7 @@
      */
     public UniqueId send(Member[] destination, Serializable msg, int options, ErrorHandler handler) throws ChannelException {
         if ( msg == null ) throw new ChannelException("Cant send a NULL message");
+        XByteBuffer buffer = null;
         try {
             if ( destination == null || destination.length == 0) throw new ChannelException("No destination given");
             ChannelData data = new ChannelData(true);//generates a unique Id
@@ -198,7 +200,8 @@
                 options = options & (~SEND_OPTIONS_BYTE_MESSAGE);
             }
             data.setOptions(options);
-            XByteBuffer buffer = new XByteBuffer(b.length+128,false);
+            //XByteBuffer buffer = new XByteBuffer(b.length+128,false);
+            buffer = BufferPool.getBufferPool().getBuffer(b.length+128, false);
             buffer.append(b,0,b.length);
             data.setMessage(buffer);
             InterceptorPayload payload = null;
@@ -211,6 +214,8 @@
         }catch ( Exception x ) {
             if ( x instanceof ChannelException ) throw (ChannelException)x;
             throw new ChannelException(x);
+        } finally {
+            if ( buffer != null ) BufferPool.getBufferPool().returnBuffer(buffer);
         }
     }
     

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java?rev=407634&r1=407633&r2=407634&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java Thu May 18 14:12:23 2006
@@ -44,7 +44,7 @@
     private boolean run = false;
     private Thread msgDispatchThread = null;
     protected long currentSize = 0;
-    private boolean useDeepClone = false;
+    private boolean useDeepClone = true;
 
     public MessageDispatchInterceptor() {
         setOptionFlag(Channel.SEND_OPTIONS_ASYNCHRONOUS);

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java?rev=407634&r1=407633&r2=407634&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java Thu May 18 14:12:23 2006
@@ -24,6 +24,8 @@
 import org.apache.catalina.tribes.io.XByteBuffer;
 import java.text.DecimalFormat;
 import org.apache.catalina.tribes.membership.MemberImpl;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 
 
@@ -34,35 +36,56 @@
  * @version 1.0
  */
 public class ThroughputInterceptor extends ChannelInterceptorBase {
-    
+    protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ThroughputInterceptor.class);
+
     double mbTx = 0;
     double mbRx = 0;
     double timeTx = 0;
-    long msgTxCnt = 1;
-    long msgRxCnt = 1;
+    AtomicLong msgTxCnt = new AtomicLong(1);
+    AtomicLong msgRxCnt = new AtomicLong(1);
+    AtomicLong msgTxErr = new AtomicLong(1);
     int interval = 10000;
-    DecimalFormat df = new DecimalFormat("##.00");
-    int addrlength = 0;
+    AtomicInteger access = new AtomicInteger(0);
+    long start = 0;
+    DecimalFormat df = new DecimalFormat("#0.00");
+
 
     public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
+        if ( access.addAndGet(1) == 1 ) start = System.currentTimeMillis();
         long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackageLength());
-        long start = System.currentTimeMillis();
-        super.sendMessage(destination,msg,payload);
-        long stop = System.currentTimeMillis();
-        timeTx+= ((double)(stop-start))/1000d;
-        mbTx += ((double)bytes)/(1024d*1024d);
-        if ( msgTxCnt % interval == 0 ) report();
-        msgTxCnt++;
+        try {
+            super.sendMessage(destination, msg, payload);
+        }catch ( ChannelException x ) {
+            msgTxErr.addAndGet(1);
+            access.addAndGet(-1);
+            throw x;
+        } 
+        mbTx += ((double)(bytes*destination.length))/(1024d*1024d);
+        if ( access.addAndGet(-1) == 0 ) {
+            long stop = System.currentTimeMillis();
+            timeTx += ( (double) (stop - start)) / 1000d;
+        }
+
+        if (msgTxCnt.get() % interval == 0) {
+            double time = timeTx;
+
+            if ( access.get() != 0 ) {
+                long now = System.currentTimeMillis();
+                time = (double)(now - start + timeTx)/1000d;
+            }
+            report(time);
+        }
+        msgTxCnt.addAndGet(1);
     }
 
     public void messageReceived(ChannelMessage msg) {
         long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackage().length);
         mbRx += ((double)bytes)/(1024d*1024d);
-        if ( msgRxCnt % interval == 0 ) report();
-        msgRxCnt++;
+        if ( msgRxCnt.get() % interval == 0 ) report(timeTx);
+        msgRxCnt.addAndGet(1);
     }
     
-    public void report() {
+    public void report(double timeTx) {
         StringBuffer buf = new StringBuffer("ThroughputInterceptor Report[\n\tTx Msg:");
         buf.append(msgTxCnt).append(" messages\n\tSent:");
         buf.append(df.format(mbTx));
@@ -70,18 +93,14 @@
         buf.append(df.format(timeTx));
         buf.append(" seconds\n\tSpeed:");
         buf.append(df.format(mbTx/timeTx));
-        buf.append(" MB/sec\n\tRx Msg:");
+        buf.append(" MB/sec\n\tError Msg:");
+        buf.append(msgTxErr).append("\n\tRx Msg:");
         buf.append(msgRxCnt);
         buf.append(" messages\n\tReceived:");
         buf.append(df.format(mbRx)).append(" MB]\n");
-        System.out.println(buf);
+        if ( log.isInfoEnabled() ) log.info(buf);
     }
     
-    public void start(int svc) throws ChannelException{
-        super.start(svc);
-        addrlength = ((MemberImpl)getLocalMember(true)).getData().length;
-    }
-
     public void setInterval(int interval) {
         this.interval = interval;
     }

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java?rev=407634&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java Thu May 18 14:12:23 2006
@@ -0,0 +1,87 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.io;
+
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.commons.logging.Log;
+
+/**
+ *
+ * @author Filip Hanik
+ * 
+ * @version 1.0
+ */
+public class BufferPool {
+    protected static Log log = LogFactory.getLog(BufferPool.class);
+
+    public static int DEFAULT_POOL_SIZE = 100*1024*1024; //100MB
+
+
+
+    protected static BufferPool instance = null;
+    protected BufferPoolAPI pool = null;
+
+    private BufferPool(BufferPoolAPI pool) {
+        this.pool = pool;
+    }
+
+    public XByteBuffer getBuffer(int minSize, boolean discard) {
+        if ( pool != null ) return pool.getBuffer(minSize, discard);
+        else return new XByteBuffer(minSize,discard);
+    }
+
+    public void returnBuffer(XByteBuffer buffer) {
+        if ( pool != null ) pool.returnBuffer(buffer);
+    }
+
+    public void clear() {
+        if ( pool != null ) pool.clear();
+    }
+
+
+    public static BufferPool getBufferPool() {
+        if (  (instance == null) ) {
+            synchronized (BufferPool.class) {
+                if ( instance == null ) {
+                   BufferPoolAPI pool = null;
+                   try {
+                       Class clazz = Class.forName("org.apache.catalina.tribes.io.BufferPool15Impl");
+                       pool = (BufferPoolAPI)clazz.newInstance();
+                       pool.setMaxSize(DEFAULT_POOL_SIZE);
+                       log.info("Created a buffer pool with max size:"+DEFAULT_POOL_SIZE+" bytes.");
+                   } catch ( Exception x ) {
+                       log.warn("Unable to initilize BufferPool, not pooling XByteBuffer objects.",x);
+                   }
+                   instance = new BufferPool(pool);
+
+                }//end if
+            }//sync
+        }//end if
+        return instance;
+    }
+
+
+    public static interface BufferPoolAPI {
+        public void setMaxSize(int bytes);
+
+        public XByteBuffer getBuffer(int minSize, boolean discard);
+
+        public void returnBuffer(XByteBuffer buffer);
+
+        public void clear();
+    }    
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool15Impl.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool15Impl.java?rev=407634&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool15Impl.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool15Impl.java Thu May 18 14:12:23 2006
@@ -0,0 +1,62 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.io;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ */
+class BufferPool15Impl implements BufferPool.BufferPoolAPI {
+    protected int maxSize;
+    protected AtomicInteger size = new AtomicInteger(0);
+    protected ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
+
+    public void setMaxSize(int bytes) {
+        this.maxSize = bytes;
+    }
+
+
+    public XByteBuffer getBuffer(int minSize, boolean discard) {
+        XByteBuffer buffer = (XByteBuffer)queue.poll();
+        if ( buffer != null ) size.addAndGet(-buffer.getCapacity());
+        if ( buffer == null ) buffer = new XByteBuffer(minSize,discard);
+        else if ( buffer.getCapacity() <= minSize ) buffer.expand(minSize);
+        buffer.setDiscard(discard);
+        buffer.reset();
+        return buffer;
+    }
+
+    public void returnBuffer(XByteBuffer buffer) {
+        if ( (size.get() + buffer.getCapacity()) <= maxSize ) {
+            size.addAndGet(buffer.getCapacity());
+            queue.offer(buffer);
+        }
+    }
+
+    public void clear() {
+        queue.clear();
+        size.set(0);
+    }
+
+    public int getMaxSize() {
+        return maxSize;
+    }
+
+}

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=407634&r1=407633&r2=407634&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Thu May 18 14:12:23 2006
@@ -20,10 +20,10 @@
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
-import java.io.ObjectInputStream;
 
 /**
  * The XByteBuffer provides a dual functionality.
@@ -119,6 +119,10 @@
             throw new ArrayIndexOutOfBoundsException("Can't trim more bytes than are available. length:"+bufSize+" trim:"+length);
         bufSize -= length;
     }
+    
+    public void reset() {
+        bufSize = 0;
+    }
             
     public byte[] getBytesDirect() {
         return this.buf;
@@ -571,5 +575,12 @@
         return data;
     }
 
-    
+    public void setDiscard(boolean discard) {
+        this.discard = discard;
+    }
+
+    public boolean getDiscard() {
+        return discard;
+    }
+
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=407634&r1=407633&r2=407634&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java Thu May 18 14:12:23 2006
@@ -80,7 +80,6 @@
     public boolean process(SelectionKey key, boolean waitForAck) throws IOException {
         int ops = key.readyOps();
         key.interestOps(key.interestOps() & ~ops);
-        
         //in case disconnect has been called
         if ((!isConnected()) && (!connecting)) throw new IOException("Sender has been disconnected, can't selection key.");
         if ( !key.isValid() ) throw new IOException("Key is not valid, it must have been cancelled.");

Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=407634&r1=407633&r2=407634&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java Thu May 18 14:12:23 2006
@@ -31,6 +31,7 @@
 import org.apache.catalina.tribes.transport.ReplicationTransmitter;
 import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor;
 import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
+import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
 
 /**
  * <p>Title: </p>
@@ -68,6 +69,7 @@
            .append("\n\t\t[-frag]")
            .append("\n\t\t[-fragsize maxmsgsize]")
            .append("\n\t\t[-throughput]")
+           .append("\n\t\t[-failuredetect]")
            .append("\n\t\t[-async]")
            .append("\n\t\t[-asyncsize maxqueuesizeinbytes]");
        return buf;
@@ -97,6 +99,7 @@
         boolean async = false;
         int asyncsize = 1024*1024*50; //50MB
         boolean throughput = false;
+        boolean failuredetect = false;
         
         for (int i = 0; i < args.length; i++) {
             if ("-bind".equals(args[i])) {
@@ -113,6 +116,8 @@
                 gzip = true;
             } else if ("-async".equals(args[i])) {
                 async = true;
+            } else if ("-failuredetect".equals(args[i])) {
+                failuredetect = true;
             } else if ("-asyncsize".equals(args[i])) {
                 asyncsize = Integer.parseInt(args[++i]);
                 System.out.println("Setting MessageDispatchInterceptor.maxQueueSize="+asyncsize);
@@ -209,6 +214,11 @@
             mi.setMaxQueueSize(asyncsize);
             channel.addInterceptor(mi);
             System.out.println("Added MessageDispatchInterceptor");
+        }
+        
+        if ( failuredetect ) {
+            TcpFailureDetector tcpfi = new TcpFailureDetector();
+            channel.addInterceptor(tcpfi);
         }
         
         return channel;

Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java?rev=407634&r1=407633&r2=407634&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java Thu May 18 14:12:23 2006
@@ -390,6 +390,7 @@
             t.start();
             threads--;
             test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
+            test.channelOptions = channelOptions;
         }
         test.run();
         if ( shutdown && send ) channel.stop(channel.DEFAULT);



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org