You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/05/18 18:07:48 UTC

svn commit: r407578 - in /tomcat/container/tc5.5.x/modules/groupcom: src/share/org/apache/catalina/tribes/group/interceptors/ src/share/org/apache/catalina/tribes/io/ src/share/org/apache/catalina/tribes/membership/ test/java/org/apache/catalina/tribes...

Author: fhanik
Date: Thu May 18 09:07:47 2006
New Revision: 407578

URL: http://svn.apache.org/viewvc?rev=407578&view=rev
Log:
Added in a bytepool, to cache byte[] for faster access
This pool can still be tuned, but as is, its already performing quite well.
Pool is compatible with 1.4 as the pool will no-pool

Added:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BytePool.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BytePool15Impl.java
Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java
    tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java?rev=407578&r1=407577&r2=407578&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java Thu May 18 09:07:47 2006
@@ -23,6 +23,7 @@
 import org.apache.catalina.tribes.io.ChannelData;
 import org.apache.catalina.tribes.io.XByteBuffer;
 import java.text.DecimalFormat;
+import org.apache.catalina.tribes.membership.MemberImpl;
 
 
 
@@ -41,9 +42,10 @@
     long msgRxCnt = 1;
     int interval = 10000;
     DecimalFormat df = new DecimalFormat("##.00");
+    int addrlength = 0;
 
     public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
-        long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackage().length);
+        long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackageLength(addrlength));
         long start = System.currentTimeMillis();
         super.sendMessage(destination,msg,payload);
         long stop = System.currentTimeMillis();
@@ -73,6 +75,11 @@
         buf.append(" messages\n\tReceived:");
         buf.append(df.format(mbRx)).append(" MB]\n");
         System.out.println(buf);
+    }
+    
+    public void start(int svc) throws ChannelException{
+        super.start(svc);
+        addrlength = ((MemberImpl)getLocalMember(true)).getData().length;
     }
 
     public void setInterval(int interval) {

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BytePool.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BytePool.java?rev=407578&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BytePool.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BytePool.java Thu May 18 09:07:47 2006
@@ -0,0 +1,93 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.io;
+
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.commons.logging.Log;
+
+/**
+ *
+ * @author Filip Hanik
+ * 
+ * @version 1.0
+ */
+public class BytePool {
+    protected static Log log = LogFactory.getLog(BytePool.class);
+
+    public static int DEFAULT_POOL_SIZE = 100*1024*1024; //100MB
+    
+    
+    
+    protected static BytePool instance = null;
+    protected BytePoolAPI pool = null;
+    
+    private BytePool(BytePoolAPI pool) {
+        this.pool = pool;
+    }
+    
+    public byte[] getArray(int minSize) {
+        if ( pool != null ) return pool.getArray(minSize);
+        else return new byte[minSize];
+    }
+    
+    public byte[] getFixedArray(int size) {
+        if ( pool != null ) return pool.getFixedArray(size);
+        else return new byte[size];
+    }
+
+    public void returnArray(byte[] array) {
+        if ( pool != null ) pool.returnArray(array);
+    }
+
+    public void clear() {
+        if ( pool != null ) pool.clear();
+    }
+
+    
+    public static BytePool getBytePool() {
+        if (  (instance == null) ) {
+            synchronized (BytePool.class) {
+                if ( instance == null ) {
+                   BytePoolAPI pool = null;
+                   try {
+                       Class clazz = Class.forName("org.apache.catalina.tribes.io.BytePool15Impl");
+                       pool = (BytePoolAPI)clazz.newInstance();
+                       pool.setMaxSize(DEFAULT_POOL_SIZE);
+                   } catch ( Exception x ) {
+                       log.warn("Unable to initilize BytePool, not pooling byte[] objects.",x);
+                   }
+                   instance = new BytePool(pool);
+                   
+                }//end if
+            }//sync
+        }//end if
+        return instance;
+    }
+    
+
+    public static interface BytePoolAPI {
+        public void setMaxSize(int bytes);
+    
+        public byte[] getArray(int minSize);
+        
+        public byte[] getFixedArray(int size);
+    
+        public void returnArray(byte[] array);
+    
+        public void clear();
+    }    
+}
\ No newline at end of file

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BytePool15Impl.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BytePool15Impl.java?rev=407578&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BytePool15Impl.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BytePool15Impl.java Thu May 18 09:07:47 2006
@@ -0,0 +1,65 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.io;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ */
+class BytePool15Impl implements BytePool.BytePoolAPI {
+    protected int maxSize;
+    protected AtomicInteger size = new AtomicInteger(0);
+    protected ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
+
+    public void setMaxSize(int bytes) {
+        this.maxSize = bytes;
+    }
+
+
+    public byte[] getArray(int minSize) {
+        byte[] array = (byte[])queue.poll();
+        if ( array != null ) size.addAndGet(-array.length);
+        if ( array == null || array.length <= minSize ) array = new byte[minSize];
+        return array;
+    }
+    
+    public byte[] getFixedArray(int size) {
+        byte[] array = getArray(size);
+        if ( array.length != size ) array = new byte[size];
+        return array;
+    }
+
+    public void returnArray(byte[] array) {
+        if ( (size.get() + array.length) <= maxSize ) {
+            size.addAndGet(array.length);
+            queue.offer(array);
+        }
+    }
+
+    public void clear() {
+        queue.clear();
+        size.set(0);
+    }
+
+    public int getMaxSize() {
+        return maxSize;
+    }
+
+}
\ No newline at end of file

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java?rev=407578&r1=407577&r2=407578&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java Thu May 18 09:07:47 2006
@@ -164,22 +164,27 @@
         setUniqueId(data);
     }
 
-    
-    /**
-     * Serializes the ChannelData object into a byte[] array
-     * @return byte[]
-     */
-    public byte[] getDataPackage()  {
-        byte[] addr = ((MemberImpl)address).getData(false);
+    public int getDataPackageLength(int addrlength) {
         int length = 
             4 + //options
             8 + //timestamp  off=4
             4 + //unique id length off=12
             uniqueId.length+ //id data off=12+uniqueId.length
             4 + //addr length off=12+uniqueId.length+4
-            addr.length+ //member data off=12+uniqueId.length+4+add.length
+            addrlength+ //member data off=12+uniqueId.length+4+add.length
             4 + //message length off=12+uniqueId.length+4+add.length+4
             message.getLength();
+        return length;
+
+    }
+    
+    /**
+     * Serializes the ChannelData object into a byte[] array
+     * @return byte[]
+     */
+    public byte[] getDataPackage()  {
+        byte[] addr = ((MemberImpl)address).getData(false);
+        int length = getDataPackageLength(addr.length);
         byte[] data = new byte[length];
         int offset = 0;
         XByteBuffer.toBytes(options,data,offset);

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=407578&r1=407577&r2=407578&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Thu May 18 09:07:47 2006
@@ -91,9 +91,11 @@
     /**
      * Constructs a new XByteBuffer
      * @param size - the initial size of the byte buffer
+     * @todo use a pool of byte[] for performance
      */
     public XByteBuffer(int size, boolean discard) {
-        buf = new byte[size];
+        //buf = new byte[size];
+        buf = BytePool.getBytePool().getArray(size);
         this.discard = discard;
     }
     
@@ -103,7 +105,8 @@
     
     public XByteBuffer(byte[] data, int size,boolean discard) {
         int length = Math.max(data.length,size);
-        buf = new byte[length];
+        //buf = new byte[length];
+        buf = BytePool.getBytePool().getArray(length);
         System.arraycopy(data,0,buf,0,data.length);
         bufSize = data.length;
         this.discard = discard;
@@ -127,7 +130,8 @@
      * Returns the bytes in the buffer, in its exact length
      */
     public byte[] getBytes() {
-        byte[] b = new byte[bufSize];
+        //byte[] b = new byte[bufSize];
+        byte[] b = BytePool.getBytePool().getFixedArray(bufSize);
         System.arraycopy(buf,0,b,0,bufSize);
         return b;
     }
@@ -235,7 +239,8 @@
 
     public void expand(int newcount) {
         //don't change the allocation strategy
-        byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+        //byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+        byte newbuf[] = BytePool.getBytePool().getArray(Math.max(buf.length << 1, newcount));
         System.arraycopy(buf, 0, newbuf, 0, bufSize);
         buf = newbuf;
     }
@@ -306,7 +311,8 @@
         int psize = countPackages(true);
         if (psize == 0) throw new java.lang.IllegalStateException("No package exists in XByteBuffer");
         int size = toInt(buf, START_DATA.length);
-        byte[] data = new byte[size];
+        //byte[] data = new byte[size];
+        byte[] data = BytePool.getBytePool().getFixedArray(size);
         System.arraycopy(buf, START_DATA.length + 4, data, 0, size);
         if (clearFromBuffer) {
             int totalsize = START_DATA.length + 4 + size + END_DATA.length;
@@ -346,7 +352,8 @@
     
     public static byte[] createDataPackage(byte[] data) {
         int length = getDataPackageLength(data.length);
-        byte[] result = new byte[length];
+        //byte[] result = new byte[length];
+        byte[] result = BytePool.getBytePool().getFixedArray(length);
         return createDataPackage(data,0,data.length,result,0);
     }
     

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java?rev=407578&r1=407577&r2=407578&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java Thu May 18 09:07:47 2006
@@ -24,6 +24,7 @@
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.catalina.tribes.transport.SenderState;
+import org.apache.catalina.tribes.io.BytePool;
 
 /**
  * A <b>membership</b> implementation using simple multicast.
@@ -184,7 +185,8 @@
         byte[] addr = host;
         long alive=System.currentTimeMillis()-getServiceStartTime();
         byte hl = (byte)addr.length;
-        byte[] data = new byte[8+4+1+addr.length+16+4+payload.length];
+        //byte[] data = new byte[8+4+1+addr.length+16+4+payload.length];
+        byte[] data = BytePool.getBytePool().getFixedArray(8+4+1+addr.length+16+4+payload.length);
         int pos = 0;
         //alive data
         XByteBuffer.toBytes((long)alive,data,0);
@@ -256,6 +258,7 @@
        member.payload = payload;
        
        member.dataPkg = new byte[data.length];
+       //member.dataPkg = BytePool.getBytePool().getFixedArray(data.length);
        System.arraycopy(data,0,member.dataPkg,0,data.length);
        
        return member;
@@ -406,7 +409,8 @@
 
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         int length = in.readInt();
-        byte[] message = new byte[length];
+        //byte[] message = new byte[length];
+        byte[] message = BytePool.getBytePool().getFixedArray(length);
         in.read(message);
         getMember(message,this);
         

Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=407578&r1=407577&r2=407578&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java Thu May 18 09:07:47 2006
@@ -30,6 +30,7 @@
 import org.apache.catalina.tribes.transport.ReceiverBase;
 import org.apache.catalina.tribes.transport.ReplicationTransmitter;
 import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor;
+import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
 
 /**
  * <p>Title: </p>
@@ -204,7 +205,7 @@
         }
         
         if ( async ) {
-            MessageDispatchInterceptor mi = new MessageDispatchInterceptor();
+            MessageDispatchInterceptor mi = new MessageDispatch15Interceptor();
             mi.setMaxQueueSize(asyncsize);
             channel.addInterceptor(mi);
             System.out.println("Added MessageDispatchInterceptor");



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org