You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/11/12 02:57:58 UTC

svn commit: r594003 - /mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java

Author: trustin
Date: Sun Nov 11 17:57:56 2007
New Revision: 594003

URL: http://svn.apache.org/viewvc?rev=594003&view=rev
Log:
Changed ReadThrottleFilter to use ScheduledExecutorService to resume stalled sessions.

Modified:
    mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java?rev=594003&r1=594002&r2=594003&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java Sun Nov 11 17:57:56 2007
@@ -20,6 +20,9 @@
 package org.apache.mina.filter.traffic;
 
 import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.mina.common.AttributeKey;
@@ -111,36 +114,50 @@
     private volatile int maxGlobalBufferSize;
     
     private final IoFilter enterFilter = new EnterFilter();
+    
+    private final ScheduledExecutorService executor;
+    private ScheduledFuture<?> resumeOthersFuture;
+    private final AtomicInteger sessionCount = new AtomicInteger();
+    private final Runnable resumeOthersTask = new Runnable() {
+        public void run() {
+            resumeOthers();
+        }
+    };
 
     /**
      * Creates a new instance with 64KB <tt>maxSessionBufferSize</tt>,
      * 128MB <tt>maxGlobalBufferSize</tt> and a new {@link DefaultMessageSizeEstimator}.
      */
-    public ReadThrottleFilter() {
-        this(ReadThrottlePolicy.LOG);
+    public ReadThrottleFilter(ScheduledExecutorService executor) {
+        this(executor, ReadThrottlePolicy.LOG);
     }
     
-    public ReadThrottleFilter(ReadThrottlePolicy policy) {
-        this(policy, null);
+    public ReadThrottleFilter(
+            ScheduledExecutorService executor, ReadThrottlePolicy policy) {
+        this(executor, policy, null);
     }
     
-    public ReadThrottleFilter(ReadThrottlePolicy policy, MessageSizeEstimator messageSizeEstimator) {
+    public ReadThrottleFilter(
+            ScheduledExecutorService executor,
+            ReadThrottlePolicy policy, MessageSizeEstimator messageSizeEstimator) {
         // 64KB, 64MB, 128MB.
-        this(policy, messageSizeEstimator, 65536, 1048576 * 64, 1048576 * 128);
+        this(executor, policy, messageSizeEstimator, 65536, 1048576 * 64, 1048576 * 128);
     }
     
     /**
      * Creates a new instance with the specified <tt>maxSessionBufferSize</tt>,
      * <tt>maxGlobalBufferSize</tt> and a new {@link DefaultMessageSizeEstimator}.
      */
-    public ReadThrottleFilter(int maxSessionBufferSize, int maxServiceBufferSize, int maxGlobalBufferSize) {
-        this(ReadThrottlePolicy.LOG, maxSessionBufferSize, maxServiceBufferSize, maxGlobalBufferSize);
+    public ReadThrottleFilter(
+            ScheduledExecutorService executor,
+            int maxSessionBufferSize, int maxServiceBufferSize, int maxGlobalBufferSize) {
+        this(executor, ReadThrottlePolicy.LOG, maxSessionBufferSize, maxServiceBufferSize, maxGlobalBufferSize);
     }
 
     public ReadThrottleFilter(
-            ReadThrottlePolicy policy,
+            ScheduledExecutorService executor, ReadThrottlePolicy policy,
             int maxSessionBufferSize, int maxServiceBufferSize, int maxGlobalBufferSize) {
-        this(policy, null, maxSessionBufferSize, maxServiceBufferSize, maxGlobalBufferSize);
+        this(executor, policy, null, maxSessionBufferSize, maxServiceBufferSize, maxGlobalBufferSize);
     }
 
     /**
@@ -159,11 +176,13 @@
      *                             a new {@link DefaultMessageSizeEstimator} is created.
      */
     public ReadThrottleFilter(
+            ScheduledExecutorService executor, 
             ReadThrottlePolicy policy, MessageSizeEstimator messageSizeEstimator,
             int maxSessionBufferSize, int maxServiceBufferSize, int maxGlobalBufferSize) {
         if (messageSizeEstimator == null) {
             messageSizeEstimator = new DefaultMessageSizeEstimator();
         }
+        this.executor = executor;
         this.messageSizeEstimator = messageSizeEstimator;
         setPolicy(policy);
         setMaxSessionBufferSize(maxSessionBufferSize);
@@ -293,6 +312,14 @@
         
         // Add an entering filter before the ExecutorFilter.
         parent.getEntry(lastFilter).addBefore(name + ".preprocessor", enterFilter);
+        
+        int previousSessionCount = sessionCount.getAndIncrement();
+        if (previousSessionCount == 0) {
+            synchronized (resumeOthersTask) {
+                resumeOthersFuture = executor.scheduleWithFixedDelay(
+                        resumeOthersTask, 1000, 1000, TimeUnit.MILLISECONDS);
+            }
+        }
     }
 
     @Override
@@ -304,6 +331,14 @@
         } catch (Exception e) {
             // Ignore.
         }
+        
+        int currentSessionCount = sessionCount.decrementAndGet();
+        if (currentSessionCount == 0) {
+            synchronized (resumeOthersTask) {
+                resumeOthersFuture.cancel(false);
+                resumeOthersFuture = null;
+            }
+        }
     }
 
     @Override
@@ -498,11 +533,10 @@
             if (maxGlobalBufferSize == 0 || globalBufferSize.get() < maxGlobalBufferSize) {
                 for (IoService service: serviceBufferSizes.keySet()) {
                     resumeService(service);
+                    synchronized (globalResumeLock) {
+                        lastGlobalResumeTime = System.currentTimeMillis();
+                    }
                 }
-            }
-            
-            synchronized (globalResumeLock) {
-                lastGlobalResumeTime = System.currentTimeMillis();
             }
         }
     }