You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2006/03/11 09:29:25 UTC

svn commit: r385038 - in /directory/trunks/mina/core/src: main/java/org/apache/mina/common/PooledByteBufferAllocator.java main/java/org/apache/mina/util/ExpiringStack.java test/java/org/apache/mina/common/ByteBufferTest.java

Author: trustin
Date: Sat Mar 11 00:29:23 2006
New Revision: 385038

URL: http://svn.apache.org/viewcvs?rev=385038&view=rev
Log:
Related issue: DIRMINA-176 (ByteBuffer pool manager which prevents endlessly increasing pool size.)
* Added ExpiringStack to expire too old buffers
* PooledByteBufferAllocator now expires buffers which has not been in use for the period user specified (timeout).
* Added a test case for the change above.

Added:
    directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringStack.java   (with props)
Modified:
    directory/trunks/mina/core/src/main/java/org/apache/mina/common/PooledByteBufferAllocator.java
    directory/trunks/mina/core/src/test/java/org/apache/mina/common/ByteBufferTest.java

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/common/PooledByteBufferAllocator.java
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/common/PooledByteBufferAllocator.java?rev=385038&r1=385037&r2=385038&view=diff
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/common/PooledByteBufferAllocator.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/common/PooledByteBufferAllocator.java Sat Mar 11 00:29:23 2006
@@ -26,7 +26,7 @@
 import java.nio.LongBuffer;
 import java.nio.ShortBuffer;
 
-import org.apache.mina.util.Stack;
+import org.apache.mina.util.ExpiringStack;
 
 /**
  * A {@link ByteBufferAllocator} which pools allocated buffers.
@@ -48,30 +48,69 @@
 {
     private final int MINIMUM_CAPACITY = 1;
 
-    private final Stack containerStack = new Stack();
+    private final ExpiringStack containerStack = new ExpiringStack();
+    private int timeout;
 
-    private final Stack[] heapBufferStacks = new Stack[] {
-            new Stack(), new Stack(), new Stack(), new Stack(),
-            new Stack(), new Stack(), new Stack(), new Stack(),
-            new Stack(), new Stack(), new Stack(), new Stack(),
-            new Stack(), new Stack(), new Stack(), new Stack(),
-            new Stack(), new Stack(), new Stack(), new Stack(),
-            new Stack(), new Stack(), new Stack(), new Stack(),
-            new Stack(), new Stack(), new Stack(), new Stack(),
-            new Stack(), new Stack(), new Stack(), new Stack(), };
-    
-    private final Stack[] directBufferStacks = new Stack[] {
-            new Stack(), new Stack(), new Stack(), new Stack(),
-            new Stack(), new Stack(), new Stack(), new Stack(),
-            new Stack(), new Stack(), new Stack(), new Stack(),
-            new Stack(), new Stack(), new Stack(), new Stack(),
-            new Stack(), new Stack(), new Stack(), new Stack(),
-            new Stack(), new Stack(), new Stack(), new Stack(),
-            new Stack(), new Stack(), new Stack(), new Stack(),
-            new Stack(), new Stack(), new Stack(), new Stack(), };
+    private final ExpiringStack[] heapBufferStacks = new ExpiringStack[] {
+            new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
+            new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
+            new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
+            new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
+            new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
+            new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
+            new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
+            new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
+            new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
+            new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
+            new ExpiringStack(), new ExpiringStack(), };
+    
+    private final ExpiringStack[] directBufferStacks = new ExpiringStack[] {
+            new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
+            new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
+            new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
+            new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
+            new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
+            new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
+            new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
+            new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
+            new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
+            new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
+            new ExpiringStack(), new ExpiringStack(), };
     
     public PooledByteBufferAllocator()
     {
+        this( 60 );
+    }
+    
+    public PooledByteBufferAllocator( int timeout )
+    {
+        setTimeout( timeout );
+        new Expirer().start();
+    }
+    
+    public int getTimeout()
+    {
+        return timeout;
+    }
+    
+    public long getTimeoutMillis()
+    {
+        return timeout * 1000L;
+    }
+    
+    public void setTimeout( int timeout )
+    {
+        if( timeout < 0 )
+        {
+            timeout = 0;
+        }
+
+        this.timeout = timeout;
+        
+        if( timeout > 0 )
+        {
+            
+        }
     }
     
     public ByteBuffer allocate( int capacity, boolean direct )
@@ -99,9 +138,9 @@
     
     private java.nio.ByteBuffer allocate0( int capacity, boolean direct )
     {
-        Stack[] bufferStacks = direct? directBufferStacks : heapBufferStacks;
+        ExpiringStack[] bufferStacks = direct? directBufferStacks : heapBufferStacks;
         int idx = getBufferStackIndex( bufferStacks, capacity );
-        Stack stack = bufferStacks[ idx ];
+        ExpiringStack stack = bufferStacks[ idx ];
 
         java.nio.ByteBuffer buf;
         synchronized( stack )
@@ -120,8 +159,8 @@
     
     private void release0( java.nio.ByteBuffer buf )
     {
-        Stack[] bufferStacks = buf.isDirect()? directBufferStacks : heapBufferStacks;
-        Stack stack = bufferStacks[ getBufferStackIndex( bufferStacks, buf.capacity() ) ];
+        ExpiringStack[] bufferStacks = buf.isDirect()? directBufferStacks : heapBufferStacks;
+        ExpiringStack stack = bufferStacks[ getBufferStackIndex( bufferStacks, buf.capacity() ) ];
         synchronized( stack )
         {
             // push back
@@ -137,7 +176,7 @@
         return buf;
     }
 
-    private int getBufferStackIndex( Stack[] bufferStacks, int size )
+    private int getBufferStackIndex( ExpiringStack[] bufferStacks, int size )
     {
         int targetSize = MINIMUM_CAPACITY;
         int stackIdx = 0;
@@ -155,17 +194,74 @@
         return stackIdx;
     }
 
+    private class Expirer extends Thread
+    {
+        public Expirer()
+        {
+            super( "PooledByteBufferExpirer" );
+            setDaemon( true );
+        }
+        
+        public void run()
+        {
+            // Expire unused buffers every seconds
+            for( ;; )
+            {
+                try
+                {
+                    Thread.sleep( 1000 );
+                }
+                catch ( InterruptedException e )
+                {
+                }
+
+                // Check if expiration is disabled.
+                long timeout = getTimeoutMillis();
+                if( timeout <= 0L )
+                {
+                    continue;
+                }
+
+                // Expire old buffers
+                long expirationTime = System.currentTimeMillis() - timeout;
+                synchronized( containerStack )
+                {
+                    containerStack.expireBefore( expirationTime );
+                }
+                
+                for( int i = directBufferStacks.length - 1; i >= 0; i -- )
+                {
+                    ExpiringStack stack = directBufferStacks[ i ];
+                    synchronized( stack )
+                    {
+                        stack.expireBefore( expirationTime );
+                    }
+                }
+
+                for( int i = heapBufferStacks.length - 1; i >= 0; i -- )
+                {
+                    ExpiringStack stack = heapBufferStacks[ i ];
+                    synchronized( stack )
+                    {
+                        stack.expireBefore( expirationTime );
+                    }
+                }
+            }
+        }
+    }
+
     private class PooledByteBuffer extends ByteBuffer
     {
         private java.nio.ByteBuffer buf;
         private int refCount = 1;
         private boolean autoExpand;
         private boolean pooled;
+        private long timestamp;
 
         protected PooledByteBuffer()
         {
         }
-
+        
         private synchronized void init( java.nio.ByteBuffer buf, boolean clear )
         {
             this.buf = buf;
@@ -212,6 +308,9 @@
                 release0( buf );
             }
 
+            // Update timestamp.
+            timestamp = System.currentTimeMillis();
+
             synchronized( containerStack )
             {
                 containerStack.push( this );
@@ -252,6 +351,11 @@
         public void setPooled( boolean pooled )
         {
             this.pooled = pooled;
+        }
+
+        public long getTimestamp()
+        {
+            return timestamp;
         }
 
         public int capacity()

Added: directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringStack.java
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringStack.java?rev=385038&view=auto
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringStack.java (added)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringStack.java Sat Mar 11 00:29:23 2006
@@ -0,0 +1,173 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * A unbounded stack with expiration.
+ * 
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class ExpiringStack implements Serializable
+{
+    private static final long serialVersionUID = 3546919169401434168L;
+    private static final int DEFAULT_CAPACITY = 4;
+
+    private Object[] items;
+    private long[] timestamps;
+    private int size = 0;
+
+    /**
+     * Construct a new, empty stack.
+     */
+    public ExpiringStack()
+    {
+        items = new Object[ DEFAULT_CAPACITY ];
+        timestamps = new long[ DEFAULT_CAPACITY ];
+    }
+
+    /**
+     * Clears this stack.
+     */
+    public void clear()
+    {
+        Arrays.fill( items, null );
+        size = 0;
+    }
+
+    /**
+     * Pops from this stack.
+     * 
+     * @return <code>null</code>, if this stack is empty or the element is
+     *         really <code>null</code>.
+     */
+    public Object pop()
+    {
+        if( size == 0 )
+        {
+            return null;
+        }
+
+        int pos = size - 1;
+        Object ret = items[ pos ];
+        items[ pos ] = null;
+        size--;
+
+        return ret;
+    }
+
+    /**
+     * Push into this stack.
+     */
+    public void push( Object obj )
+    {
+        if( size == items.length )
+        {
+            // expand queue
+            final int oldLen = items.length;
+            Object[] tmp = new Object[ oldLen * 2 ];
+            System.arraycopy( items, 0, tmp, 0, size );
+            items = tmp;
+        }
+
+        items[ size ] = obj;
+        timestamps[ size ] = System.currentTimeMillis();
+        size++;
+    }
+
+    public void remove( Object o )
+    {
+        for( int i = size - 1; i >= 0; i-- )
+        {
+            if( items[ i ] == o )
+            {
+                System.arraycopy( items, i + 1, items, i, size - i - 1 );
+                System.arraycopy( timestamps, i + 1, timestamps, i, size - i - 1 );
+                items[ size - 1 ] = null;
+                size--;
+                break;
+            }
+        }
+    }
+    
+    public void expireBefore( long time )
+    {
+        int i; 
+        for( i = 0; i < size; i ++ )
+        {
+            if( timestamps[ i ] >= time )
+            {
+                break;
+            }
+        }
+        
+        if( i > 0 )
+        {
+            size -= i;
+            System.arraycopy( items, i, items, 0, size );
+            System.arraycopy( timestamps, i, timestamps, 0, size );
+            Arrays.fill( items, size, items.length, null );
+        }
+    }
+
+    /**
+     * Returns the first element of the stack.
+     * 
+     * @return <code>null</code>, if the stack is empty, or the element is
+     *         really <code>null</code>.
+     */
+    public Object first()
+    {
+        if( size == 0 )
+        {
+            return null;
+        }
+
+        return items[ size - 1 ];
+    }
+
+    public Object last()
+    {
+        if( size == 0 )
+        {
+            return null;
+        }
+
+        return items[ 0 ];
+    }
+
+    /**
+     * Returns <code>true</code> if the stack is empty.
+     */
+    public boolean isEmpty()
+    {
+        return ( size == 0 );
+    }
+
+    /**
+     * Returns the number of elements in the stack.
+     */
+    public int size()
+    {
+        return size;
+    }
+}
\ No newline at end of file

Propchange: directory/trunks/mina/core/src/main/java/org/apache/mina/util/ExpiringStack.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision

Modified: directory/trunks/mina/core/src/test/java/org/apache/mina/common/ByteBufferTest.java
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/test/java/org/apache/mina/common/ByteBufferTest.java?rev=385038&r1=385037&r2=385038&view=diff
==============================================================================
--- directory/trunks/mina/core/src/test/java/org/apache/mina/common/ByteBufferTest.java (original)
+++ directory/trunks/mina/core/src/test/java/org/apache/mina/common/ByteBufferTest.java Sat Mar 11 00:29:23 2006
@@ -524,4 +524,29 @@
         Assert.assertEquals( 10, buf.limit() );
         Assert.assertEquals( 10, buf.capacity() );
     }
+    
+    public void testPoolExpiration() throws Exception
+    {
+        PooledByteBufferAllocator allocator =
+            ( PooledByteBufferAllocator ) ByteBuffer.getAllocator();
+        
+        // Make a buffer pooled.
+        ByteBuffer buf = ByteBuffer.allocate( 16 );
+        buf.release();
+        
+        // Let everything flushed.
+        allocator.setTimeout( 1 );
+        Thread.sleep( 2000 );
+        
+        // Make sure old buffers are flushed.
+        Assert.assertNotSame( buf, ByteBuffer.allocate( 16 ) );
+        
+        // Make sure new buffers are not flushed.
+        allocator.setTimeout( 10 );
+        buf = ByteBuffer.allocate( 16 );
+        buf.release();
+        Thread.sleep( 2000 );
+        Assert.assertSame( buf, ByteBuffer.allocate( 16 ) );
+
+    }
 }