You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/11/09 05:37:43 UTC
svn commit: r593420 - in
/mina/trunk/core/src/main/java/org/apache/mina/filter/executor:
ExecutorFilter.java IoEventQueueHandler.java OrderedThreadPoolExecutor.java
Author: trustin
Date: Thu Nov 8 20:37:42 2007
New Revision: 593420
URL: http://svn.apache.org/viewvc?rev=593420&view=rev
Log:
* Added IoEventQueueHandler
* OrderedThreadPoolExecutor invokes IoEventQueueHandler on any queue events now.
Added:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java (with props)
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java?rev=593420&r1=593419&r2=593420&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java Thu Nov 8 20:37:42 2007
@@ -26,10 +26,8 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoEventType;
@@ -103,6 +101,10 @@
* chain.addLast("executor2", new ExecutorFilter(IoEventType.WRITE));
* </code></pre>
*
+ * <h2>Preventing {@link OutOfMemoryError}</h2>
+ *
+ * Please refer to {@link OrderedThreadPoolExecutor} and {@link IoEventQueueHandler}.
+ *
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev$, $Date$
*/
@@ -152,8 +154,8 @@
public ExecutorFilter(
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
- RejectedExecutionHandler handler) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), handler, (IoEventType[]) null);
+ IoEventQueueHandler queueHandler) {
+ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler, (IoEventType[]) null);
}
/**
@@ -164,7 +166,7 @@
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, new AbortPolicy(), (IoEventType[]) null);
+ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null, (IoEventType[]) null);
}
/**
@@ -174,8 +176,8 @@
public ExecutorFilter(
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
- ThreadFactory threadFactory, RejectedExecutionHandler handler) {
- this(new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, handler), true, (IoEventType[]) null);
+ ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
+ this(new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, queueHandler), true, (IoEventType[]) null);
}
/**
@@ -218,8 +220,8 @@
public ExecutorFilter(
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
- RejectedExecutionHandler handler, IoEventType... eventTypes) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), handler, eventTypes);
+ IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
+ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler, eventTypes);
}
/**
@@ -230,7 +232,7 @@
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory, IoEventType... eventTypes) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, new AbortPolicy(), eventTypes);
+ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null, eventTypes);
}
/**
@@ -240,8 +242,8 @@
public ExecutorFilter(
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
- ThreadFactory threadFactory, RejectedExecutionHandler handler, IoEventType... eventTypes) {
- this(new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, handler), true, eventTypes);
+ ThreadFactory threadFactory, IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
+ this(new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, queueHandler), true, eventTypes);
}
/**
Added: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java?rev=593420&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java Thu Nov 8 20:37:42 2007
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import java.util.EventListener;
+
+import org.apache.mina.common.IoEvent;
+
+/**
+ * Listens to all event queue operations occurring in {@link OrderedThreadPoolExecutor}.
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public interface IoEventQueueHandler extends EventListener {
+ /**
+ * Returns <tt>true</tt> if and only if the specified <tt>event</tt> is
+ * allowed to be offered to the event queue. The <tt>event</tt> is dropped
+ * if <tt>false</tt> is returned.
+ */
+ boolean accept(OrderedThreadPoolExecutor executor, IoEvent event);
+
+ /**
+ * Invoked after the specified <tt>event</tt> has been offered to the
+ * event queue.
+ */
+ void offered(OrderedThreadPoolExecutor executor, IoEvent event);
+
+ /**
+ * Invoked after the specified <tt>event</tt> has been polled from the
+ * event queue.
+ */
+ void polled(OrderedThreadPoolExecutor executor, IoEvent event);
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java?rev=593420&r1=593419&r2=593420&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java Thu Nov 8 20:37:42 2007
@@ -49,8 +49,15 @@
public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
private static final IoSession EXIT_SIGNAL = new DummySession();
+ private static final IoEventQueueHandler NOOP_QUEUE_MONITOR = new IoEventQueueHandler() {
+ public boolean accept(OrderedThreadPoolExecutor executor, IoEvent event) {
+ return true;
+ }
+ public void offered(OrderedThreadPoolExecutor executor, IoEvent event) {}
+ public void polled(OrderedThreadPoolExecutor executor, IoEvent event) {}
+ };
- private final AttributeKey BUFFER = new AttributeKey(getClass(), "queue");
+ private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");
private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
private final Set<Worker> workers = new HashSet<Worker>();
@@ -61,9 +68,10 @@
private final AtomicInteger idleWorkers = new AtomicInteger();
private long completedTaskCount;
-
private volatile boolean shutdown;
+ private volatile IoEventQueueHandler queueHandler;
+
public OrderedThreadPoolExecutor(int maximumPoolSize) {
this(0, maximumPoolSize);
}
@@ -80,22 +88,22 @@
public OrderedThreadPoolExecutor(
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
- RejectedExecutionHandler handler) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), handler);
+ IoEventQueueHandler queueMonitor) {
+ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueMonitor);
}
public OrderedThreadPoolExecutor(
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, new AbortPolicy());
+ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
}
public OrderedThreadPoolExecutor(
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
- ThreadFactory threadFactory, RejectedExecutionHandler handler) {
- super(0, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, handler);
+ ThreadFactory threadFactory, IoEventQueueHandler queueMonitor) {
+ super(0, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy());
if (corePoolSize < 0) {
throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
}
@@ -106,8 +114,25 @@
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
+ setQueueHandler(queueMonitor);
}
+ public IoEventQueueHandler getQueueHandler() {
+ return queueHandler;
+ }
+
+ public void setQueueHandler(IoEventQueueHandler queueHandler) {
+ if (queueHandler == null) {
+ queueHandler = NOOP_QUEUE_MONITOR;
+ }
+ this.queueHandler = queueHandler;
+ }
+
+ @Override
+ public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
+ // Ignore the request. It must always be AbortPolicy.
+ }
+
private void addWorker() {
synchronized (workers) {
if (workers.size() >= maximumPoolSize) {
@@ -253,10 +278,15 @@
Queue<Runnable> queue = buf.queue;
boolean offer;
synchronized (queue) {
- queue.offer(e);
- if (buf.processingCompleted) {
- buf.processingCompleted = false;
- offer = true;
+ if (queueHandler.accept(this, e)) {
+ queue.offer(e);
+ queueHandler.offered(this, e);
+ if (buf.processingCompleted) {
+ buf.processingCompleted = false;
+ offer = true;
+ } else {
+ offer = false;
+ }
} else {
offer = false;
}
@@ -345,7 +375,12 @@
}
}
}
-
+
+ @Override
+ public BlockingQueue<Runnable> getQueue() {
+ throw new UnsupportedOperationException("Please use getQueue(Runnable) instead.");
+ }
+
@Override
public void purge() {
}
@@ -482,6 +517,8 @@
break;
}
}
+
+ queueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
runTask(task);
}