You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2012/12/21 21:37:06 UTC

svn commit: r1425131 - in /mina/mina/trunk: core/src/main/java/org/apache/mina/api/ core/src/main/java/org/apache/mina/service/ core/src/main/java/org/apache/mina/service/client/ core/src/main/java/org/apache/mina/service/executor/ core/src/main/java/o...

Author: jvermillard
Date: Fri Dec 21 20:37:06 2012
New Revision: 1425131

URL: http://svn.apache.org/viewvc?rev=1425131&view=rev
Log:
IoHandler executor (working but WIP)

Added:
    mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/HandlerCaller.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/InOrderHandlerExecutor.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/IoHandlerExecutor.java
Removed:
    mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/FixedThreadPoolHandlerExecutor.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/HandlerExecutor.java
Modified:
    mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoService.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/service/AbstractIoService.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/service/server/AbstractIoServer.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpClient.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpClient.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java
    mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerFilterEventTest.java
    mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerHandlerTest.java
    mina/mina/trunk/core/src/test/resources/log4j2-test.xml
    mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/udpecho/NioUdpEchoServer.java
    mina/mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerDecoder.java

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoService.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoService.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoService.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoService.java Fri Dec 21 20:37:06 2012
@@ -21,6 +21,8 @@ package org.apache.mina.api;
 
 import java.util.Map;
 
+import org.apache.mina.service.executor.IoHandlerExecutor;
+
 /**
  * Base interface for all {@link IoServer}s and {@link IoClient}s that provide I/O service and manage {@link IoSession}
  * s.
@@ -51,6 +53,12 @@ public interface IoService {
     IoHandler getIoHandler();
 
     /**
+     * Get the {@link IoHandlerExecutor} used for executing {@link IoHandler} events in another pool of thread (not in
+     * the low level I/O one).
+     */
+    IoHandlerExecutor getIoHandlerExecutor();
+
+    /**
      * Get the list of filters installed on this service
      * 
      * @return The list of installed filters

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/service/AbstractIoService.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/service/AbstractIoService.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/service/AbstractIoService.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/service/AbstractIoService.java Fri Dec 21 20:37:06 2012
@@ -26,6 +26,7 @@ import org.apache.mina.api.IoFilter;
 import org.apache.mina.api.IoHandler;
 import org.apache.mina.api.IoService;
 import org.apache.mina.api.IoSession;
+import org.apache.mina.service.executor.IoHandlerExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +48,9 @@ public abstract class AbstractIoService 
     /** The high level business logic */
     private IoHandler handler;
 
+    /** used for executing IoHandler event in another pool of thread (not in the low level I/O one) */
+    protected final IoHandlerExecutor ioHandlerExecutor;
+
     /**
      * The Service states
      */
@@ -67,9 +71,14 @@ public abstract class AbstractIoService 
 
     /**
      * Create an AbstractIoService
+     * 
+     * @param eventExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O one).
+     *        Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+     *        operations.
      */
-    protected AbstractIoService() {
+    protected AbstractIoService(final IoHandlerExecutor eventExecutor) {
         this.state = ServiceState.NONE;
+        this.ioHandlerExecutor = eventExecutor;
     }
 
     /**
@@ -97,6 +106,14 @@ public abstract class AbstractIoService 
     }
 
     /**
+     * {@inheritDoc}
+     */
+    @Override
+    public IoHandlerExecutor getIoHandlerExecutor() {
+        return ioHandlerExecutor;
+    }
+
+    /**
      * @return true if the IoService is active
      */
     public boolean isActive() {

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java Fri Dec 21 20:37:06 2012
@@ -25,6 +25,7 @@ import org.apache.mina.api.IoClient;
 import org.apache.mina.api.IoFuture;
 import org.apache.mina.api.IoSession;
 import org.apache.mina.service.AbstractIoService;
+import org.apache.mina.service.executor.IoHandlerExecutor;
 
 /**
  * TODO
@@ -35,8 +36,8 @@ public abstract class AbstractIoClient e
     /**
      * Create an new AbstractIoClient instance
      */
-    protected AbstractIoClient() {
-        super();
+    protected AbstractIoClient(IoHandlerExecutor eventExecutor) {
+        super(eventExecutor);
     }
 
     @Override

Added: mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/HandlerCaller.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/HandlerCaller.java?rev=1425131&view=auto
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/HandlerCaller.java (added)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/HandlerCaller.java Fri Dec 21 20:37:06 2012
@@ -0,0 +1,58 @@
+package org.apache.mina.service.executor;
+
+import org.apache.mina.api.IoSession;
+
+class HandlerCaller implements EventVisitor {
+
+    @Override
+    public void visit(CloseEvent event) {
+        IoSession session = event.getSession();
+        try {
+            session.getService().getIoHandler().sessionClosed(session);
+        } catch (Exception e) {
+            session.getService().getIoHandler().exceptionCaught(session, e);
+        }
+    }
+
+    @Override
+    public void visit(IdleEvent event) {
+        IoSession session = event.getSession();
+        try {
+            session.getService().getIoHandler().sessionIdle(session, event.getIdleStatus());
+        } catch (Exception e) {
+            session.getService().getIoHandler().exceptionCaught(session, e);
+        }
+
+    }
+
+    @Override
+    public void visit(OpenEvent event) {
+        IoSession session = event.getSession();
+        try {
+            session.getService().getIoHandler().sessionOpened(session);
+        } catch (Exception e) {
+            session.getService().getIoHandler().exceptionCaught(session, e);
+        }
+
+    }
+
+    @Override
+    public void visit(ReceiveEvent event) {
+        IoSession session = event.getSession();
+        try {
+            session.getService().getIoHandler().messageReceived(session, event.getMessage());
+        } catch (Exception e) {
+            session.getService().getIoHandler().exceptionCaught(session, e);
+        }
+    }
+
+    @Override
+    public void visit(SentEvent event) {
+        IoSession session = event.getSession();
+        try {
+            session.getService().getIoHandler().messageSent(session, event.getMessage());
+        } catch (Exception e) {
+            session.getService().getIoHandler().exceptionCaught(session, e);
+        }
+    }
+}
\ No newline at end of file

Added: mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/InOrderHandlerExecutor.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/InOrderHandlerExecutor.java?rev=1425131&view=auto
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/InOrderHandlerExecutor.java (added)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/InOrderHandlerExecutor.java Fri Dec 21 20:37:06 2012
@@ -0,0 +1,97 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.service.executor;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InOrderHandlerExecutor implements IoHandlerExecutor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(InOrderHandlerExecutor.class);
+
+    private Worker[] workers;
+
+    public InOrderHandlerExecutor(int workerThreadCount, int queueSize) {
+        LOG.debug("creating InOrderHandlerExecutor workerThreadCount = {} queueSize = {}", workerThreadCount, queueSize);
+        workers = new Worker[workerThreadCount];
+
+        for (int i = 0; i < workerThreadCount; i++) {
+            workers[i] = new Worker(i, queueSize);
+        }
+        for (int i = 0; i < workerThreadCount; i++) {
+            workers[i].start();
+        }
+        LOG.debug("workers started");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void execute(Event event) {
+        try {
+            int workerIndex = (int) (event.getSession().getId() % workers.length);
+            LOG.debug("executing event {} in worker {}", event, workerIndex);
+            workers[workerIndex].enqueue(event);
+        } catch (InterruptedException e) {
+            // interrupt the world
+            return;
+        }
+    }
+
+    private static class Worker extends Thread {
+
+        private static HandlerCaller caller = new HandlerCaller();
+
+        private final BlockingQueue<Event> queue;
+
+        public Worker(int index, int queueSize) {
+            super("IoHandlerWorker " + index);
+            queue = new LinkedBlockingQueue<Event>(queueSize);
+        }
+
+        public void enqueue(Event event) throws InterruptedException {
+            LOG.debug("enqueing event : {}", event);
+            queue.put(event);
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        public void run() {
+            for (;;) {
+                try {
+
+                    Event e = queue.take();
+                    LOG.debug("dequeing event {}", e);
+                    e.visit(caller);
+
+                } catch (InterruptedException e) {
+                    // end this thread
+                    return;
+                }
+            }
+        }
+    }
+}

Added: mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/IoHandlerExecutor.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/IoHandlerExecutor.java?rev=1425131&view=auto
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/IoHandlerExecutor.java (added)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/IoHandlerExecutor.java Fri Dec 21 20:37:06 2012
@@ -0,0 +1,25 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.service.executor;
+
+public interface IoHandlerExecutor {
+
+    void execute(Event command);
+}

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/service/server/AbstractIoServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/service/server/AbstractIoServer.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/service/server/AbstractIoServer.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/service/server/AbstractIoServer.java Fri Dec 21 20:37:06 2012
@@ -21,6 +21,7 @@ package org.apache.mina.service.server;
 
 import org.apache.mina.api.IoServer;
 import org.apache.mina.service.AbstractIoService;
+import org.apache.mina.service.executor.IoHandlerExecutor;
 
 /**
  * Base implementation for {@link IoServer}s.
@@ -30,9 +31,13 @@ import org.apache.mina.service.AbstractI
 public abstract class AbstractIoServer extends AbstractIoService implements IoServer {
     /**
      * Create an new AbstractIoServer instance
+     * 
+     * @param eventExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O one).
+     *        Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+     *        operations.
      */
-    protected AbstractIoServer() {
-        super();
+    protected AbstractIoServer(final IoHandlerExecutor eventExecutor) {
+        super(eventExecutor);
     }
 
     // does the reuse address flag should be positioned
@@ -40,6 +45,7 @@ public abstract class AbstractIoServer e
 
     /**
      * Set the reuse address flag on the server socket
+     * 
      * @param reuseAddress <code>true</code> to enable
      */
     public void setReuseAddress(final boolean reuseAddress) {
@@ -48,6 +54,7 @@ public abstract class AbstractIoServer e
 
     /**
      * Is the reuse address enabled for this server.
+     * 
      * @return
      */
     public boolean isReuseAddress() {

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java Fri Dec 21 20:37:06 2012
@@ -40,6 +40,12 @@ import org.apache.mina.api.IoService;
 import org.apache.mina.api.IoSession;
 import org.apache.mina.filterchain.ReadFilterChainController;
 import org.apache.mina.filterchain.WriteFilterChainController;
+import org.apache.mina.service.executor.CloseEvent;
+import org.apache.mina.service.executor.IdleEvent;
+import org.apache.mina.service.executor.IoHandlerExecutor;
+import org.apache.mina.service.executor.OpenEvent;
+import org.apache.mina.service.executor.ReceiveEvent;
+import org.apache.mina.service.executor.SentEvent;
 import org.apache.mina.service.idlechecker.IdleChecker;
 import org.apache.mina.transport.nio.SelectorLoop;
 import org.apache.mina.util.AbstractIoFuture;
@@ -650,7 +656,14 @@ public abstract class AbstractIoSession 
             final IoHandler handler = getService().getIoHandler();
 
             if (handler != null) {
-                handler.sessionOpened(this);
+                IoHandlerExecutor executor = getService().getIoHandlerExecutor();
+                if (executor != null) {
+                    // asynchronous event
+                    executor.execute(new OpenEvent(this));
+                } else {
+                    // synchronous call (in the I/O loop)
+                    handler.sessionOpened(this);
+                }
             }
         } catch (final RuntimeException e) {
             processException(e);
@@ -669,7 +682,14 @@ public abstract class AbstractIoSession 
 
             final IoHandler handler = getService().getIoHandler();
             if (handler != null) {
-                handler.sessionClosed(this);
+                IoHandlerExecutor executor = getService().getIoHandlerExecutor();
+                if (executor != null) {
+                    // asynchronous event
+                    executor.execute(new CloseEvent(this));
+                } else {
+                    // synchronous call (in the I/O loop)
+                    handler.sessionClosed(this);
+                }
             }
         } catch (final RuntimeException e) {
             processException(e);
@@ -688,14 +708,28 @@ public abstract class AbstractIoSession 
             }
             final IoHandler handler = getService().getIoHandler();
             if (handler != null) {
-                handler.sessionIdle(this, status);
+                IoHandlerExecutor executor = getService().getIoHandlerExecutor();
+                if (executor != null) {
+                    // asynchronous event
+                    executor.execute(new IdleEvent(this, status));
+                } else {
+                    // synchronous call (in the I/O loop)
+                    handler.sessionIdle(this, status);
+                }
             }
         } catch (final RuntimeException e) {
             processException(e);
         }
-
     }
 
+    /** for knowing if the message buffer is the selector loop one */
+    static ThreadLocal<ByteBuffer> tl = new ThreadLocal<ByteBuffer>() {
+        @Override
+        protected ByteBuffer initialValue() {
+            return null;
+        }
+    };
+
     /**
      * process session message received event using the filter chain. To be called by the session {@link SelectorLoop} .
      * 
@@ -704,6 +738,7 @@ public abstract class AbstractIoSession 
     public void processMessageReceived(final ByteBuffer message) {
         LOG.debug("processing message '{}' received event for session {}", message, this);
 
+        tl.set(message);
         try {
             // save basic statistics
             readBytes += message.remaining();
@@ -711,15 +746,31 @@ public abstract class AbstractIoSession 
 
             if (chain.length < 1) {
                 LOG.debug("Nothing to do, the chain is empty");
+                final IoHandler handler = getService().getIoHandler();
+                if (handler != null) {
+                    IoHandlerExecutor executor = getService().getIoHandlerExecutor();
+                    if (executor != null) {
+                        // asynchronous event
+                        // copy the bytebuffer
+                        LOG.debug("copying bytebuffer before pushing to the executor");
+                        ByteBuffer original = message;
+                        ByteBuffer clone = ByteBuffer.allocate(original.capacity());
+                        original.rewind();// copy from the beginning
+                        clone.put(original);
+                        original.rewind();
+                        clone.flip();
+                        executor.execute(new ReceiveEvent(this, clone));
+                    } else {
+                        // synchronous call (in the I/O loop)
+                        handler.messageReceived(this, message);
+                    }
+                }
+
             } else {
                 readChainPosition = 0;
                 // we call the first filter, it's supposed to call the next ones using the filter chain controller
                 chain[readChainPosition].messageReceived(this, message, this);
             }
-            final IoHandler handler = getService().getIoHandler();
-            if (handler != null) {
-                handler.messageReceived(this, message);
-            }
         } catch (final RuntimeException e) {
             processException(e);
         }
@@ -772,7 +823,14 @@ public abstract class AbstractIoSession 
             }
             final IoHandler handler = getService().getIoHandler();
             if (handler != null) {
-                handler.messageSent(this, highLevelMessage);
+                IoHandlerExecutor executor = getService().getIoHandlerExecutor();
+                if (executor != null) {
+                    // asynchronous event
+                    executor.execute(new SentEvent(this, highLevelMessage));
+                } else {
+                    // synchronous call (in the I/O loop)
+                    handler.messageSent(this, highLevelMessage);
+                }
             }
         } catch (final RuntimeException e) {
             processException(e);
@@ -805,7 +863,7 @@ public abstract class AbstractIoSession 
      * At the end of write chain processing, enqueue final encoded {@link ByteBuffer} message in the session
      */
     private void enqueueFinalWriteMessage(final Object message) {
-        LOG.debug("end of write chan we enqueue the message in the session : {}", message);
+        LOG.debug("end of write chain we enqueue the message in the session : {}", message);
         lastWriteRequest = enqueueWriteRequest(message);
     }
 
@@ -818,6 +876,29 @@ public abstract class AbstractIoSession 
 
         if (readChainPosition >= chain.length) {
             // end of chain processing
+            final IoHandler handler = getService().getIoHandler();
+            if (handler != null) {
+                IoHandlerExecutor executor = getService().getIoHandlerExecutor();
+                if (executor != null) {
+                    // asynchronous event
+                    if (message == tl.get()) {
+                        // copy the bytebuffer
+                        LOG.debug("copying bytebuffer before pushing to the executor");
+                        ByteBuffer original = (ByteBuffer) message;
+                        ByteBuffer clone = ByteBuffer.allocate(original.capacity());
+                        original.rewind();// copy from the beginning
+                        clone.put(original);
+                        original.rewind();
+                        clone.flip();
+                        executor.execute(new ReceiveEvent(this, clone));
+                    } else {
+                        executor.execute(new ReceiveEvent(this, message));
+                    }
+                } else {
+                    // synchronous call (in the I/O loop)
+                    handler.messageReceived(this, message);
+                }
+            }
         } else {
             chain[readChainPosition].messageReceived(this, message, this);
         }

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java Fri Dec 21 20:37:06 2012
@@ -161,6 +161,7 @@ public class NioSelectorLoop implements 
             ops |= SelectionKey.OP_WRITE;
         }
         key.interestOps(ops);
+        key.selector().wakeup();
     }
 
     /**

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpClient.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpClient.java Fri Dec 21 20:37:06 2012
@@ -20,6 +20,7 @@
 package org.apache.mina.transport.nio;
 
 import org.apache.mina.api.IoSessionConfig;
+import org.apache.mina.service.executor.IoHandlerExecutor;
 import org.apache.mina.transport.tcp.AbstractTcpClient;
 
 /**
@@ -31,8 +32,8 @@ public class NioTcpClient extends Abstra
     /**
      * Create a new instance of NioTcpClient
      */
-    public NioTcpClient() {
-        super();
+    public NioTcpClient(IoHandlerExecutor ioHandlerExecutor) {
+        super(ioHandlerExecutor);
     }
 
     @Override

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java Fri Dec 21 20:37:06 2012
@@ -28,6 +28,8 @@ import java.nio.channels.ServerSocketCha
 import java.nio.channels.SocketChannel;
 
 import org.apache.mina.api.IdleStatus;
+import org.apache.mina.service.executor.InOrderHandlerExecutor;
+import org.apache.mina.service.executor.IoHandlerExecutor;
 import org.apache.mina.service.idlechecker.IdleChecker;
 import org.apache.mina.service.idlechecker.IndexedIdleChecker;
 import org.apache.mina.transport.tcp.AbstractTcpServer;
@@ -60,23 +62,28 @@ public class NioTcpServer extends Abstra
     private IdleChecker idleChecker;
 
     /**
-     * Create a TCP server with new selector pool of default size.
+     * Create a TCP server with new selector pool of default size and a {@link IoHandlerExecutor} of default type (
+     * {@link InOrderHandlerExecutor})
      */
     public NioTcpServer() {
         this(new NioSelectorLoop("accept", 0),
-                new FixedSelectorLoopPool(Runtime.getRuntime().availableProcessors() + 1));
+                new FixedSelectorLoopPool(Runtime.getRuntime().availableProcessors() + 1), new InOrderHandlerExecutor(
+                        Runtime.getRuntime().availableProcessors() + 1, Runtime.getRuntime().availableProcessors() + 1));
     }
 
     /**
-     * Create a TCP server with provided selector loops pool. We will use one SelectorLoop get from
-     * the pool to manage the OP_ACCEPT events. If the pool contains only one SelectorLoop, then
-     * all the events will be managed by the same Selector.
+     * Create a TCP server with provided selector loops pool. We will use one SelectorLoop get from the pool to manage
+     * the OP_ACCEPT events. If the pool contains only one SelectorLoop, then all the events will be managed by the same
+     * Selector.
      * 
      * @param acceptSelectorLoop the selector loop for handling accept events (connection of new session)
      * @param readWriteSelectorLoop the pool of selector loop for handling read/write events of connected sessions
+     * @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
+     *        one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+     *        operations.
      */
-    public NioTcpServer(SelectorLoopPool selectorLoopPool) {
-        super();
+    public NioTcpServer(SelectorLoopPool selectorLoopPool, IoHandlerExecutor handlerExecutor) {
+        super(handlerExecutor);
         this.acceptSelectorLoop = selectorLoopPool.getSelectorLoop();
         this.readWriteSelectorPool = selectorLoopPool;
     }
@@ -86,9 +93,13 @@ public class NioTcpServer extends Abstra
      * 
      * @param acceptSelectorLoop the selector loop for handling accept events (connection of new session)
      * @param readWriteSelectorLoop the pool of selector loop for handling read/write events of connected sessions
-     */
-    public NioTcpServer(SelectorLoop acceptSelectorLoop, SelectorLoopPool readWriteSelectorLoop) {
-        super();
+     * @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
+     *        one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+     *        operations.
+     */
+    public NioTcpServer(SelectorLoop acceptSelectorLoop, SelectorLoopPool readWriteSelectorLoop,
+            IoHandlerExecutor handlerExecutor) {
+        super(handlerExecutor);
         this.acceptSelectorLoop = acceptSelectorLoop;
         this.readWriteSelectorPool = readWriteSelectorLoop;
     }

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpClient.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpClient.java Fri Dec 21 20:37:06 2012
@@ -20,6 +20,7 @@
 package org.apache.mina.transport.nio;
 
 import org.apache.mina.api.IoSessionConfig;
+import org.apache.mina.service.executor.IoHandlerExecutor;
 import org.apache.mina.transport.udp.AbstractUdpClient;
 
 /**
@@ -31,8 +32,8 @@ public class NioUdpClient extends Abstra
     /**
      * Create a new instance of NioUdpClient
      */
-    public NioUdpClient() {
-        super();
+    public NioUdpClient(IoHandlerExecutor ioHandlerExecutor) {
+        super(ioHandlerExecutor);
     }
 
     @Override

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java Fri Dec 21 20:37:06 2012
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.mina.api.IoSessionConfig;
+import org.apache.mina.service.executor.IoHandlerExecutor;
 import org.apache.mina.service.idlechecker.IdleChecker;
 import org.apache.mina.service.idlechecker.IndexedIdleChecker;
 import org.apache.mina.transport.udp.AbstractUdpServer;
@@ -65,8 +66,8 @@ public class NioUdpServer extends Abstra
     /**
      * Create a new instance of NioUdpServer
      */
-    public NioUdpServer(final NioSelectorLoop selectorLoop) {
-        super();
+    public NioUdpServer(NioSelectorLoop selectorLoop, IoHandlerExecutor ioHandlerExecutor) {
+        super(ioHandlerExecutor);
         this.selectorLoop = selectorLoop;
     }
 

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java Fri Dec 21 20:37:06 2012
@@ -20,6 +20,7 @@
 package org.apache.mina.transport.tcp;
 
 import org.apache.mina.service.client.AbstractIoClient;
+import org.apache.mina.service.executor.IoHandlerExecutor;
 
 /**
  * Base class for UDP based Clients
@@ -30,7 +31,7 @@ public abstract class AbstractTcpClient 
     /**
      * Create an new AbsractTcpClient instance
      */
-    protected AbstractTcpClient() {
-        super();
+    protected AbstractTcpClient(IoHandlerExecutor ioHandlerExecutor) {
+        super(ioHandlerExecutor);
     }
 }

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java Fri Dec 21 20:37:06 2012
@@ -19,6 +19,7 @@
  */
 package org.apache.mina.transport.tcp;
 
+import org.apache.mina.service.executor.IoHandlerExecutor;
 import org.apache.mina.service.server.AbstractIoServer;
 
 /**
@@ -32,9 +33,13 @@ public abstract class AbstractTcpServer 
 
     /**
      * Create an new AbsractTcpServer instance
+     * 
+     * @param eventExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O one).
+     *        Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+     *        operations.
      */
-    protected AbstractTcpServer() {
-        super();
+    protected AbstractTcpServer(final IoHandlerExecutor eventExecutor) {
+        super(eventExecutor);
         this.config = new DefaultTcpSessionConfig();
     }
 
@@ -48,6 +53,7 @@ public abstract class AbstractTcpServer 
 
     /**
      * Set the default configuration for created TCP sessions
+     * 
      * @param config
      */
     public void setSessionConfig(final TcpSessionConfig config) {

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java Fri Dec 21 20:37:06 2012
@@ -23,6 +23,7 @@ import javax.net.ssl.SSLException;
 
 import org.apache.mina.api.IoSession;
 import org.apache.mina.service.client.AbstractIoClient;
+import org.apache.mina.service.executor.IoHandlerExecutor;
 
 /**
  * TODO
@@ -33,8 +34,8 @@ public abstract class AbstractUdpClient 
     /**
      * Create an new AbsractUdpClient instance
      */
-    protected AbstractUdpClient() {
-        super();
+    protected AbstractUdpClient(IoHandlerExecutor ioHandlerExecutor) {
+        super(ioHandlerExecutor);
     }
 
     /**

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java Fri Dec 21 20:37:06 2012
@@ -22,6 +22,7 @@ package org.apache.mina.transport.udp;
 import javax.net.ssl.SSLException;
 
 import org.apache.mina.api.IoSession;
+import org.apache.mina.service.executor.IoHandlerExecutor;
 import org.apache.mina.service.server.AbstractIoServer;
 
 /**
@@ -33,8 +34,8 @@ public abstract class AbstractUdpServer 
     /**
      * Create an new AbsractUdpServer instance
      */
-    protected AbstractUdpServer() {
-        super();
+    protected AbstractUdpServer(IoHandlerExecutor ioHandlerExecutor) {
+        super(ioHandlerExecutor);
     }
 
     /**

Modified: mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerFilterEventTest.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerFilterEventTest.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerFilterEventTest.java (original)
+++ mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerFilterEventTest.java Fri Dec 21 20:37:06 2012
@@ -19,9 +19,7 @@
  */
 package org.apache.mina.transport.tcp;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
+import static junit.framework.Assert.*;
 
 import java.io.IOException;
 import java.net.Socket;
@@ -76,7 +74,7 @@ public class NioTcpServerFilterEventTest
 
         // connect some clients
         for (int i = 0; i < CLIENT_COUNT; i++) {
-            //System.out.println("Creation client " + i);
+            // System.out.println("Creation client " + i);
             try {
                 clients[i] = new Socket("127.0.0.1", port);
             } catch (Exception e) {
@@ -126,15 +124,14 @@ public class NioTcpServerFilterEventTest
     }
 
     /**
-     * A test that creates 50 clients, each one of them writing one message. We will
-     * check that for each client we correctly process the sessionOpened, messageReceived,
-     * messageSent and sessionClosed events.
-     * We use only one selector to process all the OP events.
+     * A test that creates 50 clients, each one of them writing one message. We will check that for each client we
+     * correctly process the sessionOpened, messageReceived, messageSent and sessionClosed events. We use only one
+     * selector to process all the OP events.
      */
     @Test
     public void generateAllKindOfServerEventOneSelector() throws IOException, InterruptedException {
         SelectorLoopPool selectorLoopPool = new FixedSelectorLoopPool(1);
-        final NioTcpServer server = new NioTcpServer(selectorLoopPool.getSelectorLoop(), selectorLoopPool);
+        final NioTcpServer server = new NioTcpServer(selectorLoopPool.getSelectorLoop(), selectorLoopPool, null);
         server.setFilters(new MyCodec(), new Handler());
         server.bind(0);
         // warm up
@@ -147,7 +144,7 @@ public class NioTcpServerFilterEventTest
 
         // connect some clients
         for (int i = 0; i < CLIENT_COUNT; i++) {
-            //System.out.println("Creation client 2 " + i);
+            // System.out.println("Creation client 2 " + i);
             clients[i] = new Socket("127.0.0.1", port);
         }
 

Modified: mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerHandlerTest.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerHandlerTest.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerHandlerTest.java (original)
+++ mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerHandlerTest.java Fri Dec 21 20:37:06 2012
@@ -19,8 +19,7 @@
  */
 package org.apache.mina.transport.tcp;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.*;
 
 import java.io.IOException;
 import java.net.Socket;
@@ -53,7 +52,7 @@ public class NioTcpServerHandlerTest {
 
     private static final int CLIENT_COUNT = 100;
 
-    private static final int WAIT_TIME = 2000;
+    private static final int WAIT_TIME = 5000;
 
     private final CountDownLatch msgSentLatch = new CountDownLatch(CLIENT_COUNT);
 
@@ -127,7 +126,7 @@ public class NioTcpServerHandlerTest {
     @Test
     public void generateAllKindOfServerEventOneSelector() throws IOException, InterruptedException {
         SelectorLoopPool selectorLoopPool = new FixedSelectorLoopPool(1);
-        final NioTcpServer server = new NioTcpServer(selectorLoopPool.getSelectorLoop(), selectorLoopPool);
+        final NioTcpServer server = new NioTcpServer(selectorLoopPool.getSelectorLoop(), selectorLoopPool, null);
         server.setFilters();
         server.setIoHandler(new Handler());
         server.bind(0);
@@ -184,19 +183,19 @@ public class NioTcpServerHandlerTest {
 
         @Override
         public void sessionOpened(final IoSession session) {
-            LOG.info("** session open");
+            LOG.debug("** session open");
             openLatch.countDown();
         }
 
         @Override
         public void sessionClosed(final IoSession session) {
-            LOG.info("** session closed");
+            LOG.debug("** session closed");
             closedLatch.countDown();
         }
 
         @Override
         public void messageReceived(final IoSession session, final Object message) {
-            LOG.info("** message received {}", message);
+            LOG.debug("** message received {}", message);
             msgReadLatch.countDown();
             if (message instanceof ByteBuffer) {
                 final ByteBuffer msg = (ByteBuffer) message;
@@ -208,7 +207,7 @@ public class NioTcpServerHandlerTest {
 
         @Override
         public void messageSent(final IoSession session, final Object message) {
-            LOG.info("** message sent {}", message);
+            LOG.debug("** message sent {}", message);
             msgSentLatch.countDown();
         }
 

Modified: mina/mina/trunk/core/src/test/resources/log4j2-test.xml
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/test/resources/log4j2-test.xml?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/test/resources/log4j2-test.xml (original)
+++ mina/mina/trunk/core/src/test/resources/log4j2-test.xml Fri Dec 21 20:37:06 2012
@@ -7,7 +7,7 @@
   </appenders>
   <loggers>
     <logger name="org.apache.log4j.xml" level="info"/>
-    <root level="debug">
+    <root level="info">
       <appender-ref ref="STDOUT"/>
     </root>
   </loggers>

Modified: mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/udpecho/NioUdpEchoServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/udpecho/NioUdpEchoServer.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/udpecho/NioUdpEchoServer.java (original)
+++ mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/udpecho/NioUdpEchoServer.java Fri Dec 21 20:37:06 2012
@@ -47,7 +47,7 @@ public class NioUdpEchoServer {
     public static void main(final String[] args) {
         LOG.info("starting echo server");
 
-        final NioUdpServer server = new NioUdpServer(new NioSelectorLoop("I/O", 0));
+        final NioUdpServer server = new NioUdpServer(new NioSelectorLoop("I/O", 0), null);
 
         // create the fitler chain for this service
         server.setFilters(new LoggingFilter("LoggingFilter1"), new IoFilter() {

Modified: mina/mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerDecoder.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerDecoder.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerDecoder.java (original)
+++ mina/mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerDecoder.java Fri Dec 21 20:37:06 2012
@@ -102,25 +102,6 @@ public class HttpServerDecoder implement
                 session.setAttribute(DECODER_STATE_ATT, DecoderState.HEAD);
             } else {
                 controller.callReadNextFilter(rq);
-
-                // is it a request with some body content ?
-                if (rq.getMethod() == HttpMethod.POST || rq.getMethod() == HttpMethod.PUT) {
-                    LOG.debug("request with content");
-                    session.setAttribute(DECODER_STATE_ATT, DecoderState.BODY);
-
-                    String contentLen = rq.getHeader("content-length");
-
-                    if (contentLen != null) {
-                        LOG.debug("found content len : {}", contentLen);
-                        session.setAttribute(BODY_REMAINING_BYTES, new Integer(contentLen));
-                    } else {
-                        throw new RuntimeException("no content length !");
-                    }
-                } else {
-                    LOG.debug("request without content");
-                    session.setAttribute(DECODER_STATE_ATT, DecoderState.NEW);
-
-                }
             }
 
             return null;