You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/11/12 02:57:58 UTC
svn commit: r594003 -
/mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
Author: trustin
Date: Sun Nov 11 17:57:56 2007
New Revision: 594003
URL: http://svn.apache.org/viewvc?rev=594003&view=rev
Log:
Changed ReadThrottleFilter to use ScheduledExecutorService to resume stalled sessions.
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java?rev=594003&r1=594002&r2=594003&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java Sun Nov 11 17:57:56 2007
@@ -20,6 +20,9 @@
package org.apache.mina.filter.traffic;
import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.common.AttributeKey;
@@ -111,36 +114,50 @@
private volatile int maxGlobalBufferSize;
private final IoFilter enterFilter = new EnterFilter();
+
+ private final ScheduledExecutorService executor;
+ private ScheduledFuture<?> resumeOthersFuture;
+ private final AtomicInteger sessionCount = new AtomicInteger();
+ private final Runnable resumeOthersTask = new Runnable() {
+ public void run() {
+ resumeOthers();
+ }
+ };
/**
* Creates a new instance with 64KB <tt>maxSessionBufferSize</tt>,
* 128MB <tt>maxGlobalBufferSize</tt> and a new {@link DefaultMessageSizeEstimator}.
*/
- public ReadThrottleFilter() {
- this(ReadThrottlePolicy.LOG);
+ public ReadThrottleFilter(ScheduledExecutorService executor) {
+ this(executor, ReadThrottlePolicy.LOG);
}
- public ReadThrottleFilter(ReadThrottlePolicy policy) {
- this(policy, null);
+ public ReadThrottleFilter(
+ ScheduledExecutorService executor, ReadThrottlePolicy policy) {
+ this(executor, policy, null);
}
- public ReadThrottleFilter(ReadThrottlePolicy policy, MessageSizeEstimator messageSizeEstimator) {
+ public ReadThrottleFilter(
+ ScheduledExecutorService executor,
+ ReadThrottlePolicy policy, MessageSizeEstimator messageSizeEstimator) {
// 64KB, 64MB, 128MB.
- this(policy, messageSizeEstimator, 65536, 1048576 * 64, 1048576 * 128);
+ this(executor, policy, messageSizeEstimator, 65536, 1048576 * 64, 1048576 * 128);
}
/**
* Creates a new instance with the specified <tt>maxSessionBufferSize</tt>,
* <tt>maxGlobalBufferSize</tt> and a new {@link DefaultMessageSizeEstimator}.
*/
- public ReadThrottleFilter(int maxSessionBufferSize, int maxServiceBufferSize, int maxGlobalBufferSize) {
- this(ReadThrottlePolicy.LOG, maxSessionBufferSize, maxServiceBufferSize, maxGlobalBufferSize);
+ public ReadThrottleFilter(
+ ScheduledExecutorService executor,
+ int maxSessionBufferSize, int maxServiceBufferSize, int maxGlobalBufferSize) {
+ this(executor, ReadThrottlePolicy.LOG, maxSessionBufferSize, maxServiceBufferSize, maxGlobalBufferSize);
}
public ReadThrottleFilter(
- ReadThrottlePolicy policy,
+ ScheduledExecutorService executor, ReadThrottlePolicy policy,
int maxSessionBufferSize, int maxServiceBufferSize, int maxGlobalBufferSize) {
- this(policy, null, maxSessionBufferSize, maxServiceBufferSize, maxGlobalBufferSize);
+ this(executor, policy, null, maxSessionBufferSize, maxServiceBufferSize, maxGlobalBufferSize);
}
/**
@@ -159,11 +176,13 @@
* a new {@link DefaultMessageSizeEstimator} is created.
*/
public ReadThrottleFilter(
+ ScheduledExecutorService executor,
ReadThrottlePolicy policy, MessageSizeEstimator messageSizeEstimator,
int maxSessionBufferSize, int maxServiceBufferSize, int maxGlobalBufferSize) {
if (messageSizeEstimator == null) {
messageSizeEstimator = new DefaultMessageSizeEstimator();
}
+ this.executor = executor;
this.messageSizeEstimator = messageSizeEstimator;
setPolicy(policy);
setMaxSessionBufferSize(maxSessionBufferSize);
@@ -293,6 +312,14 @@
// Add an entering filter before the ExecutorFilter.
parent.getEntry(lastFilter).addBefore(name + ".preprocessor", enterFilter);
+
+ int previousSessionCount = sessionCount.getAndIncrement();
+ if (previousSessionCount == 0) {
+ synchronized (resumeOthersTask) {
+ resumeOthersFuture = executor.scheduleWithFixedDelay(
+ resumeOthersTask, 1000, 1000, TimeUnit.MILLISECONDS);
+ }
+ }
}
@Override
@@ -304,6 +331,14 @@
} catch (Exception e) {
// Ignore.
}
+
+ int currentSessionCount = sessionCount.decrementAndGet();
+ if (currentSessionCount == 0) {
+ synchronized (resumeOthersTask) {
+ resumeOthersFuture.cancel(false);
+ resumeOthersFuture = null;
+ }
+ }
}
@Override
@@ -498,11 +533,10 @@
if (maxGlobalBufferSize == 0 || globalBufferSize.get() < maxGlobalBufferSize) {
for (IoService service: serviceBufferSizes.keySet()) {
resumeService(service);
+ synchronized (globalResumeLock) {
+ lastGlobalResumeTime = System.currentTimeMillis();
+ }
}
- }
-
- synchronized (globalResumeLock) {
- lastGlobalResumeTime = System.currentTimeMillis();
}
}
}