You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2008/04/14 09:06:24 UTC
svn commit: r647676 - in
/mina/trunk/core/src/main/java/org/apache/mina/filter/executor:
IoEventQueueHandler.java IoEventQueueThrottle.java
Author: trustin
Date: Mon Apr 14 00:06:22 2008
New Revision: 647676
URL: http://svn.apache.org/viewvc?rev=647676&view=rev
Log:
Changed IoEventQueueHandler to accept any event source other than ThreadPoolExecutor - will solve writer-side OOM issue with this interface
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java?rev=647676&r1=647675&r2=647676&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java Mon Apr 14 00:06:22 2008
@@ -20,7 +20,6 @@
package org.apache.mina.filter.executor;
import java.util.EventListener;
-import java.util.concurrent.ThreadPoolExecutor;
import org.apache.mina.common.IoEvent;
@@ -37,13 +36,13 @@
* A dummy handler which always accepts event doing nothing particular.
*/
static IoEventQueueHandler NOOP = new IoEventQueueHandler() {
- public boolean accept(ThreadPoolExecutor executor, IoEvent event) {
+ public boolean accept(Object source, IoEvent event) {
return true;
}
- public void offered(ThreadPoolExecutor executor, IoEvent event) {
+ public void offered(Object source, IoEvent event) {
// NOOP
}
- public void polled(ThreadPoolExecutor executor, IoEvent event) {
+ public void polled(Object source, IoEvent event) {
// NOOP
}
};
@@ -53,17 +52,17 @@
* allowed to be offered to the event queue. The <tt>event</tt> is dropped
* if <tt>false</tt> is returned.
*/
- boolean accept(ThreadPoolExecutor executor, IoEvent event);
+ boolean accept(Object source, IoEvent event);
/**
* Invoked after the specified <tt>event</tt> has been offered to the
* event queue.
*/
- void offered(ThreadPoolExecutor executor, IoEvent event);
+ void offered(Object source, IoEvent event);
/**
* Invoked after the specified <tt>event</tt> has been polled from the
* event queue.
*/
- void polled(ThreadPoolExecutor executor, IoEvent event);
+ void polled(Object source, IoEvent event);
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java?rev=647676&r1=647675&r2=647676&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueThrottle.java Mon Apr 14 00:06:22 2008
@@ -19,7 +19,6 @@
*/
package org.apache.mina.filter.executor;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.common.IoEvent;
@@ -27,19 +26,18 @@
import org.slf4j.LoggerFactory;
/**
- * Throttles incoming events into {@link OrderedThreadPoolExecutor} or
- * {@link UnorderedThreadPoolExecutor}.
- *
+ * Throttles incoming or outgoing events.
+ *
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev$, $Date$
*/
public class IoEventQueueThrottle implements IoEventQueueHandler {
-
+
private final Logger logger = LoggerFactory.getLogger(getClass());
-
+
private final IoEventSizeEstimator eventSizeEstimator;
private volatile int threshold;
-
+
private final Object lock = new Object();
private final AtomicInteger counter = new AtomicInteger();
private int waiters;
@@ -47,28 +45,28 @@
public IoEventQueueThrottle() {
this(new DefaultIoEventSizeEstimator(), 65536);
}
-
+
public IoEventQueueThrottle(int threshold) {
this(new DefaultIoEventSizeEstimator(), threshold);
}
-
+
public IoEventQueueThrottle(IoEventSizeEstimator eventSizeEstimator, int threshold) {
if (eventSizeEstimator == null) {
throw new NullPointerException("eventSizeEstimator");
}
this.eventSizeEstimator = eventSizeEstimator;
-
+
setThreshold(threshold);
}
-
+
public IoEventSizeEstimator getEventSizeEstimator() {
return eventSizeEstimator;
}
-
+
public int getThreshold() {
return threshold;
}
-
+
public int getCounter() {
return counter.get();
}
@@ -80,11 +78,11 @@
this.threshold = threshold;
}
- public boolean accept(ThreadPoolExecutor executor, IoEvent event) {
+ public boolean accept(Object source, IoEvent event) {
return true;
}
-
- public void offered(ThreadPoolExecutor executor, IoEvent event) {
+
+ public void offered(Object source, IoEvent event) {
int eventSize = estimateSize(event);
int currentCounter = counter.addAndGet(eventSize);
logState();
@@ -94,7 +92,7 @@
}
}
- public void polled(ThreadPoolExecutor executor, IoEvent event) {
+ public void polled(Object source, IoEvent event) {
int eventSize = estimateSize(event);
int currentCounter = counter.addAndGet(-eventSize);
@@ -114,18 +112,18 @@
}
return size;
}
-
+
private void logState() {
if (logger.isDebugEnabled()) {
logger.debug(Thread.currentThread().getName() + " state: " + counter.get() + " / " + getThreshold());
}
}
-
+
protected void block() {
if (logger.isDebugEnabled()) {
logger.debug(Thread.currentThread().getName() + " blocked: " + counter.get() + " >= " + threshold);
}
-
+
synchronized (lock) {
while (counter.get() >= threshold) {
waiters ++;
@@ -138,12 +136,12 @@
}
}
}
-
+
if (logger.isDebugEnabled()) {
logger.debug(Thread.currentThread().getName() + " unblocked: " + counter.get() + " < " + threshold);
}
}
-
+
protected void unblock() {
synchronized (lock) {
if (waiters > 0) {