You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2008/04/14 09:06:24 UTC

svn commit: r647676 - in /mina/trunk/core/src/main/java/org/apache/mina/filter/executor: IoEventQueueHandler.java IoEventQueueThrottle.java

Author: trustin
Date: Mon Apr 14 00:06:22 2008
New Revision: 647676

URL: http://svn.apache.org/viewvc?rev=647676&view=rev
Log:
Changed IoEventQueueHandler to accept any event source other than ThreadPoolExecutor - will solve writer-side OOM issue with this interface

Modified:
    mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
    mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java?rev=647676&r1=647675&r2=647676&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java Mon Apr 14 00:06:22 2008
@@ -20,7 +20,6 @@
 package org.apache.mina.filter.executor;
 
 import java.util.EventListener;
-import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.mina.common.IoEvent;
 
@@ -37,13 +36,13 @@
      * A dummy handler which always accepts event doing nothing particular.
      */
     static IoEventQueueHandler NOOP = new IoEventQueueHandler() {
-        public boolean accept(ThreadPoolExecutor executor, IoEvent event) {
+        public boolean accept(Object source, IoEvent event) {
             return true;
         }
-        public void offered(ThreadPoolExecutor executor, IoEvent event) {
+        public void offered(Object source, IoEvent event) {
             // NOOP
         }
-        public void polled(ThreadPoolExecutor executor, IoEvent event) {
+        public void polled(Object source, IoEvent event) {
             // NOOP
         }
     };
@@ -53,17 +52,17 @@
      * allowed to be offered to the event queue.  The <tt>event</tt> is dropped
      * if <tt>false</tt> is returned.
      */
-    boolean accept(ThreadPoolExecutor executor, IoEvent event);
+    boolean accept(Object source, IoEvent event);
 
     /**
      * Invoked after the specified <tt>event</tt> has been offered to the
      * event queue.
      */
-    void offered(ThreadPoolExecutor executor, IoEvent event);
+    void offered(Object source, IoEvent event);
 
     /**
      * Invoked after the specified <tt>event</tt> has been polled from the
      * event queue.
      */
-    void polled(ThreadPoolExecutor executor, IoEvent event);
+    void polled(Object source, IoEvent event);
 }

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java?rev=647676&r1=647675&r2=647676&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java Mon Apr 14 00:06:22 2008
@@ -19,7 +19,6 @@
  */
 package org.apache.mina.filter.executor;
 
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.mina.common.IoEvent;
@@ -27,19 +26,18 @@
 import org.slf4j.LoggerFactory;
 
 /**
- * Throttles incoming events into {@link OrderedThreadPoolExecutor} or
- * {@link UnorderedThreadPoolExecutor}.
- * 
+ * Throttles incoming or outgoing events.
+ *
  * @author The Apache MINA Project (dev@mina.apache.org)
  * @version $Rev$, $Date$
  */
 public class IoEventQueueThrottle implements IoEventQueueHandler {
-    
+
     private final Logger logger = LoggerFactory.getLogger(getClass());
-    
+
     private final IoEventSizeEstimator eventSizeEstimator;
     private volatile int threshold;
-    
+
     private final Object lock = new Object();
     private final AtomicInteger counter = new AtomicInteger();
     private int waiters;
@@ -47,28 +45,28 @@
     public IoEventQueueThrottle() {
         this(new DefaultIoEventSizeEstimator(), 65536);
     }
-    
+
     public IoEventQueueThrottle(int threshold) {
         this(new DefaultIoEventSizeEstimator(), threshold);
     }
-    
+
     public IoEventQueueThrottle(IoEventSizeEstimator eventSizeEstimator, int threshold) {
         if (eventSizeEstimator == null) {
             throw new NullPointerException("eventSizeEstimator");
         }
         this.eventSizeEstimator = eventSizeEstimator;
-        
+
         setThreshold(threshold);
     }
-    
+
     public IoEventSizeEstimator getEventSizeEstimator() {
         return eventSizeEstimator;
     }
-    
+
     public int getThreshold() {
         return threshold;
     }
-    
+
     public int getCounter() {
         return counter.get();
     }
@@ -80,11 +78,11 @@
         this.threshold = threshold;
     }
 
-    public boolean accept(ThreadPoolExecutor executor, IoEvent event) {
+    public boolean accept(Object source, IoEvent event) {
         return true;
     }
-    
-    public void offered(ThreadPoolExecutor executor, IoEvent event) {
+
+    public void offered(Object source, IoEvent event) {
         int eventSize = estimateSize(event);
         int currentCounter = counter.addAndGet(eventSize);
         logState();
@@ -94,7 +92,7 @@
         }
     }
 
-    public void polled(ThreadPoolExecutor executor, IoEvent event) {
+    public void polled(Object source, IoEvent event) {
         int eventSize = estimateSize(event);
         int currentCounter = counter.addAndGet(-eventSize);
 
@@ -114,18 +112,18 @@
         }
         return size;
     }
-    
+
     private void logState() {
         if (logger.isDebugEnabled()) {
             logger.debug(Thread.currentThread().getName() + " state: " + counter.get() + " / " + getThreshold());
         }
     }
-    
+
     protected void block() {
         if (logger.isDebugEnabled()) {
             logger.debug(Thread.currentThread().getName() + " blocked: " + counter.get() + " >= " + threshold);
         }
-        
+
         synchronized (lock) {
             while (counter.get() >= threshold) {
                 waiters ++;
@@ -138,12 +136,12 @@
                 }
             }
         }
-        
+
         if (logger.isDebugEnabled()) {
             logger.debug(Thread.currentThread().getName() + " unblocked: " + counter.get() + " < " + threshold);
         }
     }
-    
+
     protected void unblock() {
         synchronized (lock) {
             if (waiters > 0) {