You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/11/09 05:37:43 UTC

svn commit: r593420 - in /mina/trunk/core/src/main/java/org/apache/mina/filter/executor: ExecutorFilter.java IoEventQueueHandler.java OrderedThreadPoolExecutor.java

Author: trustin
Date: Thu Nov  8 20:37:42 2007
New Revision: 593420

URL: http://svn.apache.org/viewvc?rev=593420&view=rev
Log:
* Added IoEventQueueHandler 
* OrderedThreadPoolExecutor invokes IoEventQueueHandler on any queue events now.

Added:
    mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java   (with props)
Modified:
    mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
    mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java?rev=593420&r1=593419&r2=593420&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java Thu Nov  8 20:37:42 2007
@@ -26,10 +26,8 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
 
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoEventType;
@@ -103,6 +101,10 @@
  * chain.addLast("executor2", new ExecutorFilter(IoEventType.WRITE));
  * </code></pre>
  * 
+ * <h2>Preventing {@link OutOfMemoryError}</h2>
+ * 
+ * Please refer to {@link OrderedThreadPoolExecutor} and {@link IoEventQueueHandler}.
+ * 
  * @author The Apache MINA Project (dev@mina.apache.org)
  * @version $Rev$, $Date$
  */
@@ -152,8 +154,8 @@
     public ExecutorFilter(
             int corePoolSize, int maximumPoolSize, 
             long keepAliveTime, TimeUnit unit,
-            RejectedExecutionHandler handler) {
-        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), handler, (IoEventType[]) null);
+            IoEventQueueHandler queueHandler) {
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler, (IoEventType[]) null);
     }
 
     /**
@@ -164,7 +166,7 @@
             int corePoolSize, int maximumPoolSize, 
             long keepAliveTime, TimeUnit unit,
             ThreadFactory threadFactory) {
-        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, new AbortPolicy(), (IoEventType[]) null);
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null, (IoEventType[]) null);
     }
 
     /**
@@ -174,8 +176,8 @@
     public ExecutorFilter(
             int corePoolSize, int maximumPoolSize, 
             long keepAliveTime, TimeUnit unit,
-            ThreadFactory threadFactory, RejectedExecutionHandler handler) {
-        this(new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, handler), true, (IoEventType[]) null);
+            ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
+        this(new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, queueHandler), true, (IoEventType[]) null);
     }
 
     /**
@@ -218,8 +220,8 @@
     public ExecutorFilter(
             int corePoolSize, int maximumPoolSize, 
             long keepAliveTime, TimeUnit unit,
-            RejectedExecutionHandler handler, IoEventType... eventTypes) {
-        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), handler, eventTypes);
+            IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler, eventTypes);
     }
 
     /**
@@ -230,7 +232,7 @@
             int corePoolSize, int maximumPoolSize, 
             long keepAliveTime, TimeUnit unit,
             ThreadFactory threadFactory, IoEventType... eventTypes) {
-        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, new AbortPolicy(), eventTypes);
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null, eventTypes);
     }
 
     /**
@@ -240,8 +242,8 @@
     public ExecutorFilter(
             int corePoolSize, int maximumPoolSize, 
             long keepAliveTime, TimeUnit unit,
-            ThreadFactory threadFactory, RejectedExecutionHandler handler, IoEventType... eventTypes) {
-        this(new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, handler), true, eventTypes);
+            ThreadFactory threadFactory, IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
+        this(new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, queueHandler), true, eventTypes);
     }
     
     /**

Added: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java?rev=593420&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java Thu Nov  8 20:37:42 2007
@@ -0,0 +1,51 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import java.util.EventListener;
+
+import org.apache.mina.common.IoEvent;
+
+/**
+ * Listens to all event queue operations occurring in {@link OrderedThreadPoolExecutor}.
+ * 
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public interface IoEventQueueHandler extends EventListener {
+    /**
+     * Returns <tt>true</tt> if and only if the specified <tt>event</tt> is
+     * allowed to be offered to the event queue.  The <tt>event</tt> is dropped
+     * if <tt>false</tt> is returned.
+     */
+    boolean accept(OrderedThreadPoolExecutor executor, IoEvent event);
+    
+    /**
+     * Invoked after the specified <tt>event</tt> has been offered to the
+     * event queue.
+     */
+    void offered(OrderedThreadPoolExecutor executor, IoEvent event);
+    
+    /**
+     * Invoked after the specified <tt>event</tt> has been polled from the
+     * event queue.
+     */
+    void polled(OrderedThreadPoolExecutor executor, IoEvent event);
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/IoEventQueueHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java?rev=593420&r1=593419&r2=593420&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java Thu Nov  8 20:37:42 2007
@@ -49,8 +49,15 @@
 public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
 
     private static final IoSession EXIT_SIGNAL = new DummySession();
+    private static final IoEventQueueHandler NOOP_QUEUE_MONITOR = new IoEventQueueHandler() {
+        public boolean accept(OrderedThreadPoolExecutor executor, IoEvent event) {
+            return true;
+        }
+        public void offered(OrderedThreadPoolExecutor executor, IoEvent event) {}
+        public void polled(OrderedThreadPoolExecutor executor, IoEvent event) {}
+    };
 
-    private final AttributeKey BUFFER = new AttributeKey(getClass(), "queue");
+    private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");
     private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
     
     private final Set<Worker> workers = new HashSet<Worker>();
@@ -61,9 +68,10 @@
     private final AtomicInteger idleWorkers = new AtomicInteger();
     
     private long completedTaskCount;
-    
     private volatile boolean shutdown;
     
+    private volatile IoEventQueueHandler queueHandler;
+    
     public OrderedThreadPoolExecutor(int maximumPoolSize) {
         this(0, maximumPoolSize);
     }
@@ -80,22 +88,22 @@
     public OrderedThreadPoolExecutor(
             int corePoolSize, int maximumPoolSize, 
             long keepAliveTime, TimeUnit unit,
-            RejectedExecutionHandler handler) {
-        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), handler);
+            IoEventQueueHandler queueMonitor) {
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueMonitor);
     }
 
     public OrderedThreadPoolExecutor(
             int corePoolSize, int maximumPoolSize, 
             long keepAliveTime, TimeUnit unit,
             ThreadFactory threadFactory) {
-        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, new AbortPolicy());
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
     }
 
     public OrderedThreadPoolExecutor(
             int corePoolSize, int maximumPoolSize, 
             long keepAliveTime, TimeUnit unit,
-            ThreadFactory threadFactory, RejectedExecutionHandler handler) {
-        super(0, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, handler);
+            ThreadFactory threadFactory, IoEventQueueHandler queueMonitor) {
+        super(0, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy());
         if (corePoolSize < 0) {
             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
         }
@@ -106,8 +114,25 @@
         
         this.corePoolSize = corePoolSize;
         this.maximumPoolSize = maximumPoolSize;
+        setQueueHandler(queueMonitor);
     }
     
+    public IoEventQueueHandler getQueueHandler() {
+        return queueHandler;
+    }
+
+    public void setQueueHandler(IoEventQueueHandler queueHandler) {
+        if (queueHandler == null) {
+            queueHandler = NOOP_QUEUE_MONITOR;
+        }
+        this.queueHandler = queueHandler;
+    }
+
+    @Override
+    public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
+        // Ignore the request.  It must always be AbortPolicy.
+    }
+
     private void addWorker() {
         synchronized (workers) {
             if (workers.size() >= maximumPoolSize) {
@@ -253,10 +278,15 @@
         Queue<Runnable> queue = buf.queue;
         boolean offer;
         synchronized (queue) {
-            queue.offer(e);
-            if (buf.processingCompleted) {
-                buf.processingCompleted = false;
-                offer = true;
+            if (queueHandler.accept(this, e)) {
+                queue.offer(e);
+                queueHandler.offered(this, e);
+                if (buf.processingCompleted) {
+                    buf.processingCompleted = false;
+                    offer = true;
+                } else {
+                    offer = false;
+                }
             } else {
                 offer = false;
             }
@@ -345,7 +375,12 @@
             }
         }
     }
-
+    
+    @Override
+    public BlockingQueue<Runnable> getQueue() {
+        throw new UnsupportedOperationException("Please use getQueue(Runnable) instead.");
+    }
+    
     @Override
     public void purge() {
     }
@@ -482,6 +517,8 @@
                         break;
                     }
                 }
+
+                queueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
 
                 runTask(task);
             }