You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2010/03/08 18:53:52 UTC

svn commit: r920419 - /openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java

Author: dblevins
Date: Mon Mar  8 17:53:51 2010
New Revision: 920419

URL: http://svn.apache.org/viewvc?rev=920419&view=rev
Log:
More options for OPENEJB-1235
This adds a "MaxAge" and an "IdleTimeout".
MaxAge is like the DNS time-to-live option, beans are removed if they are too old.
IdleTimeout is a simple timeout based on duration from last invocation.

Modified:
    openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java

Modified: openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java?rev=920419&r1=920418&r2=920419&view=diff
==============================================================================
--- openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java (original)
+++ openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java Mon Mar  8 17:53:51 2010
@@ -17,11 +17,20 @@
 package org.apache.openejb.util;
 
 import java.lang.ref.SoftReference;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -43,8 +52,17 @@
     private final Semaphore maxPolicy;
     private final Semaphore minPolicy;
     private final int max;
+    private final Executor executor;
 
+    private final Supplier<T> supplier = null;
+    
     public Pool(int max, int min, boolean strict) {
+        this(max, min, strict, 0, 0, 0, null);
+    }
+
+    public Pool(int max, int min, boolean strict, long maxAge, long idleTimeout, long interval, Executor executor) {
+        if (min > max) greater("max", max, "min", min);
+        if (maxAge != 0 && idleTimeout > maxAge) greater("MaxAge", maxAge, "IdleTimeout", idleTimeout);
         this.max = max;
         this.minPolicy = new Semaphore(min);
         if (strict) {
@@ -52,6 +70,28 @@
         } else {
             this.maxPolicy = null;
         }
+
+        if (interval == 0) {
+            interval = 60 * 1000; // one minute
+        }
+
+        final boolean timeouts = maxAge > 0 || idleTimeout > 0;
+
+        this.executor = timeouts ? (executor != null) ? executor : createExecutor() : null;
+
+        if (timeouts) {
+
+            final Timer timer = new Timer("PoolEviction", true);
+            timer.scheduleAtFixedRate(new Eviction(maxAge, idleTimeout), idleTimeout, interval);
+        }
+    }
+
+    private ThreadPoolExecutor createExecutor() {
+        return new ThreadPoolExecutor(0, 10, 60 * 60, TimeUnit.SECONDS, new LinkedBlockingQueue());
+    }
+
+    private void greater(String maxName, long max, String minName, long min) {
+        throw new IllegalArgumentException(minName + " cannot be greater than " + maxName + ": " + minName + "=" + min + ", " + maxName + "=" + max);
     }
 
     /**
@@ -59,11 +99,12 @@
      * <p/>
      * A pop() call that returns null is considered successful.
      *
-     * @param timeout
-     * @param unit
-     * @return
-     * @throws InterruptedException
+     * @param timeout time to block while waiting for an instance
+     * @param unit    unit of time dicated by the timeout
+     * @return an entry from the pool or null indicating permission to create and push() an instance into the pool
+     * @throws InterruptedException  vm level thread interruption
      * @throws IllegalStateException if a permit could not be acquired
+     * @throws TimeoutException      if no instance could be obtained within the timeout
      */
     public Entry<T> pop(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
         if (maxPolicy != null) {
@@ -99,7 +140,7 @@
     /**
      * Attempt to aquire a permit to add the object to the pool.
      *
-     * @param obj
+     * @param obj object to add to the pool
      * @return true of the item as added
      */
     public boolean add(T obj) {
@@ -107,13 +148,24 @@
     }
 
     /**
+     * Attempt to aquire a permit to add the object to the pool.
+     *
+     * @param obj    object to add to the pool
+     * @param offset creation time offset, used for maxAge
+     * @return true of the item as added
+     */
+    public boolean add(T obj, int offset) {
+        return (maxPolicy == null || maxPolicy.tryAcquire()) && push(new Entry<T>(obj, offset));
+    }
+
+    /**
      * Never call this method without having successfully called
      * {@link #pop(long, java.util.concurrent.TimeUnit)} beforehand.
      * <p/>
      * Failure to do so will increase the max pool size by one.
      *
-     * @param obj
-     * @return
+     * @param obj object to push onto the pool
+     * @return false if the pool max size was exceeded
      */
     public boolean push(T obj) {
         return push(new Entry<T>(obj));
@@ -125,7 +177,7 @@
      * <p/>
      * Failure to do so will increase the max pool size by one.
      *
-     * @param entry
+     * @param entry entry that was popped from the pool
      * @return true of the item as added
      */
     public boolean push(Entry<T> entry) {
@@ -183,7 +235,32 @@
         if (maxPolicy != null) maxPolicy.release();
     }
 
+    /**
+     * This internal method allows us to "swap" the status
+     * of two entries before returning them to the pool.
+     * <p/>
+     * This allows us to elect a replacement in the min pool
+     * without ever loosing loosing pool consistency.
+     * <p/>
+     * Neither argument is allowed to be null.
+     *
+     * @param hard the "min" pool item that will be discarded
+     * @param weak the "min" pool item to replace the discarded instance
+     */
+    private void discardAndReplace(Entry<T> hard, Entry<T> weak) {
+        // The replacement becomes a hard reference -- a "min" pool item
+        weak.hard.set(weak.get());
+        push(weak);
+
+        // The discarded item becomes a weak reference
+        hard.hard.set(null);
+        discard(hard);
+    }
+
     public static class Entry<T> {
+        private final long created;
+        private long used;
+
         private final SoftReference<T> soft;
         private final AtomicReference<T> hard = new AtomicReference<T>();
 
@@ -198,14 +275,32 @@
          * object wrapped by this Entry.
          * <p/>
          * This helps ensure that when an Entry is returned to the pool it is
-         * always safe to call {@link Semaphore#release()} which increases the
+         * always safe to call {@link java.util.concurrent.Semaphore#release()} which increases the
          * permit size by one.
          *
-         * @param obj
+         * @param obj object that this Entry will wrap
          */
         private Entry(T obj) {
+            this(obj, 0);
+        }
+
+        /**
+         * Constructor is private so that it is impossible for an Entry object
+         * to exist without there being a corresponding permit issued for the
+         * object wrapped by this Entry.
+         * <p/>
+         * This helps ensure that when an Entry is returned to the pool it is
+         * always safe to call {@link Semaphore#release()} which increases the
+         * permit size by one.
+         *
+         * @param obj    object that this Entry will wrap
+         * @param offset creation time offset, used for maxAge
+         */
+        private Entry(T obj, int offset) {
             this.soft = new SoftReference<T>(obj);
             this.active.set(obj);
+            this.created = System.currentTimeMillis() + offset;
+            this.used = created;
         }
 
         public T get() {
@@ -215,10 +310,211 @@
         /**
          * Largely for testing purposes
          *
-         * @return
+         * @return true if this entry is in the "min" pool
          */
         public boolean hasHardReference() {
             return hard.get() != null;
         }
     }
+
+    private final class Eviction extends TimerTask {
+
+        private final long maxAge;
+        private final long idleTimeout;
+
+        private Eviction(long maxAge, long idleTimeout) {
+            this.maxAge = maxAge;
+            this.idleTimeout = idleTimeout;
+        }
+
+        public void run() {
+
+            final long now = System.currentTimeMillis();
+
+            final List<Entry<T>> entries = new ArrayList(max);
+
+            try {
+                while (true) entries.add(pop(0, TimeUnit.MILLISECONDS));
+            } catch (InterruptedException e) {
+                Thread.interrupted();
+            } catch (TimeoutException e) {
+                // pool has been drained
+            }
+
+
+            final List<Expired> expiredList = new ArrayList<Expired>(max);
+
+            { // Expire aged instances
+
+                // Any "null" entries are immediately returned
+                // Any non-aged "min" refs are immediately returned
+
+                final Iterator<Entry<T>> iter = entries.iterator();
+                while (iter.hasNext()) {
+                    Entry<T> entry = iter.next();
+
+                    if (entry == null) {
+                        // return the lock immediately
+                        push(entry);
+                        iter.remove();
+                        continue;
+                    }
+
+                    long age = now - entry.created;
+
+                    if (maxAge > 0 && age > maxAge) {
+
+                        // Entry is too old, expire it
+
+                        iter.remove();
+                        final Expired expired = new Expired(entry);
+                        expiredList.add(expired);
+
+                        if (!expired.entry.hasHardReference()) {
+                            expired.tryDiscard();
+                        }
+
+                    } else if (entry.hasHardReference()) {
+                        // This is an item from the "minimum" pool
+                        // and therefore cannot timeout in the next
+                        // algorithm.  Return it immediately.
+                        push(entry);
+                        iter.remove();
+                    }
+                }
+            }
+
+            // At this point all Entries not eligible for idle timeout
+            // have been returned to the pool and can now be in use.
+
+            // There are no "null" and no min-pool ("hard") entries beyond
+            // this point.  Everything is a weak reference, possibly timed out.
+
+            // If items from the "min" pool have expired, we will need
+            // to return that number to the pool regardless of their
+            // timeout setting so that they may take the place of the
+            // expired instances
+
+            Iterator<Expired> discardables = expiredList.iterator();
+            while (discardables.hasNext() && entries.size() > 0) {
+
+                if (discardables.next().replaceMinEntry(entries.get(0))) {
+                    entries.remove(0);
+                }
+
+            }
+
+            // At this point all the expired "min" pool refs will have
+            // been replaced with entries from our initial list.
+            //
+            // Unless, of course, we didn't have enough entries left over
+            // to fill the "min" pool deficit.  In that case, the entries
+            // list will be empty and this loop will do nothing.
+            final Iterator<Entry<T>> iter = entries.iterator();
+            while (iter.hasNext()) {
+
+                final Entry<T> entry = iter.next();
+
+                iter.remove(); // we know we're going to use it
+
+                final long idle = now - entry.used;
+
+                if (idle > idleTimeout) {
+                    // too lazy -- timed out 
+                    final Expired expired = new Expired(entry);
+
+                    expiredList.add(expired);
+
+                    expired.tryDiscard();
+
+                } else {
+                    push(entry);
+                }
+            }
+
+            // Ok, now we have the task of invoking callbacks
+            // on all the expired instances.
+            //
+            // If there are any "min" pool instances left over
+            // we need to queue up creation of a replacement
+
+            for (Expired expired : expiredList) {
+                executor.execute(new Discard(expired.entry));
+
+                if (expired.entry.hasHardReference()) {
+                    executor.execute(new Replace(expired.entry));
+                }
+            }
+
+        }
+
+        private class Expired {
+            private final Entry<T> entry;
+            private final AtomicBoolean discarded = new AtomicBoolean();
+
+            private Expired(Entry<T> entry) {
+                this.entry = entry;
+            }
+
+            public boolean tryDiscard() {
+                if (discarded.getAndSet(true)) return false;
+
+                discard(entry);
+
+                return true;
+            }
+
+
+            public boolean replaceMinEntry(Entry<T> replacement) {
+                if (!entry.hasHardReference()) return false;
+                if (replacement.hasHardReference()) return false;
+                if (discarded.getAndSet(true)) return false;
+
+                discardAndReplace(entry, replacement);
+
+                return true;
+            }
+        }
+    }
+
+    private class Replace implements Runnable {
+        private final Entry<T> expired;
+
+        private Replace(Entry<T> expired) {
+            this.expired = expired;
+        }
+
+        public void run() {
+            try {
+                final T t = supplier.create();
+                final Entry entry = new Entry(t);
+                entry.hard.set(t);
+                push(entry);
+            } catch (Throwable e) {
+                // Possibly re-try
+                // TODO: log creation failure
+                discard(expired);
+            }
+        }
+    }
+
+    private class Discard implements Runnable {
+        private final Entry<T> expired;
+
+        private Discard(Entry<T> expired) {
+            this.expired = expired;
+        }
+
+        public void run() {
+            supplier.discard(expired.get());
+        }
+    }
+
+    public static interface Supplier<T> {
+
+        void discard(T t);
+
+        T create();
+
+    }
 }