You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2012/12/21 21:37:06 UTC
svn commit: r1425131 - in /mina/mina/trunk:
core/src/main/java/org/apache/mina/api/
core/src/main/java/org/apache/mina/service/
core/src/main/java/org/apache/mina/service/client/
core/src/main/java/org/apache/mina/service/executor/ core/src/main/java/o...
Author: jvermillard
Date: Fri Dec 21 20:37:06 2012
New Revision: 1425131
URL: http://svn.apache.org/viewvc?rev=1425131&view=rev
Log:
IoHandler executor (working but WIP)
Added:
mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/HandlerCaller.java
mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/InOrderHandlerExecutor.java
mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/IoHandlerExecutor.java
Removed:
mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/FixedThreadPoolHandlerExecutor.java
mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/HandlerExecutor.java
Modified:
mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoService.java
mina/mina/trunk/core/src/main/java/org/apache/mina/service/AbstractIoService.java
mina/mina/trunk/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java
mina/mina/trunk/core/src/main/java/org/apache/mina/service/server/AbstractIoServer.java
mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpClient.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpClient.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java
mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerFilterEventTest.java
mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerHandlerTest.java
mina/mina/trunk/core/src/test/resources/log4j2-test.xml
mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/udpecho/NioUdpEchoServer.java
mina/mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerDecoder.java
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoService.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoService.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoService.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoService.java Fri Dec 21 20:37:06 2012
@@ -21,6 +21,8 @@ package org.apache.mina.api;
import java.util.Map;
+import org.apache.mina.service.executor.IoHandlerExecutor;
+
/**
* Base interface for all {@link IoServer}s and {@link IoClient}s that provide I/O service and manage {@link IoSession}
* s.
@@ -51,6 +53,12 @@ public interface IoService {
IoHandler getIoHandler();
/**
+ * Get the {@link IoHandlerExecutor} used for executing {@link IoHandler} events in another pool of thread (not in
+ * the low level I/O one).
+ */
+ IoHandlerExecutor getIoHandlerExecutor();
+
+ /**
* Get the list of filters installed on this service
*
* @return The list of installed filters
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/service/AbstractIoService.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/service/AbstractIoService.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/service/AbstractIoService.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/service/AbstractIoService.java Fri Dec 21 20:37:06 2012
@@ -26,6 +26,7 @@ import org.apache.mina.api.IoFilter;
import org.apache.mina.api.IoHandler;
import org.apache.mina.api.IoService;
import org.apache.mina.api.IoSession;
+import org.apache.mina.service.executor.IoHandlerExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +48,9 @@ public abstract class AbstractIoService
/** The high level business logic */
private IoHandler handler;
+ /** used for executing IoHandler event in another pool of thread (not in the low level I/O one) */
+ protected final IoHandlerExecutor ioHandlerExecutor;
+
/**
* The Service states
*/
@@ -67,9 +71,14 @@ public abstract class AbstractIoService
/**
* Create an AbstractIoService
+ *
+ * @param eventExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O one).
+ * Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+ * operations.
*/
- protected AbstractIoService() {
+ protected AbstractIoService(final IoHandlerExecutor eventExecutor) {
this.state = ServiceState.NONE;
+ this.ioHandlerExecutor = eventExecutor;
}
/**
@@ -97,6 +106,14 @@ public abstract class AbstractIoService
}
/**
+ * {@inheritDoc}
+ */
+ @Override
+ public IoHandlerExecutor getIoHandlerExecutor() {
+ return ioHandlerExecutor;
+ }
+
+ /**
* @return true if the IoService is active
*/
public boolean isActive() {
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java Fri Dec 21 20:37:06 2012
@@ -25,6 +25,7 @@ import org.apache.mina.api.IoClient;
import org.apache.mina.api.IoFuture;
import org.apache.mina.api.IoSession;
import org.apache.mina.service.AbstractIoService;
+import org.apache.mina.service.executor.IoHandlerExecutor;
/**
* TODO
@@ -35,8 +36,8 @@ public abstract class AbstractIoClient e
/**
* Create an new AbstractIoClient instance
*/
- protected AbstractIoClient() {
- super();
+ protected AbstractIoClient(IoHandlerExecutor eventExecutor) {
+ super(eventExecutor);
}
@Override
Added: mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/HandlerCaller.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/HandlerCaller.java?rev=1425131&view=auto
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/HandlerCaller.java (added)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/HandlerCaller.java Fri Dec 21 20:37:06 2012
@@ -0,0 +1,58 @@
+package org.apache.mina.service.executor;
+
+import org.apache.mina.api.IoSession;
+
+class HandlerCaller implements EventVisitor {
+
+ @Override
+ public void visit(CloseEvent event) {
+ IoSession session = event.getSession();
+ try {
+ session.getService().getIoHandler().sessionClosed(session);
+ } catch (Exception e) {
+ session.getService().getIoHandler().exceptionCaught(session, e);
+ }
+ }
+
+ @Override
+ public void visit(IdleEvent event) {
+ IoSession session = event.getSession();
+ try {
+ session.getService().getIoHandler().sessionIdle(session, event.getIdleStatus());
+ } catch (Exception e) {
+ session.getService().getIoHandler().exceptionCaught(session, e);
+ }
+
+ }
+
+ @Override
+ public void visit(OpenEvent event) {
+ IoSession session = event.getSession();
+ try {
+ session.getService().getIoHandler().sessionOpened(session);
+ } catch (Exception e) {
+ session.getService().getIoHandler().exceptionCaught(session, e);
+ }
+
+ }
+
+ @Override
+ public void visit(ReceiveEvent event) {
+ IoSession session = event.getSession();
+ try {
+ session.getService().getIoHandler().messageReceived(session, event.getMessage());
+ } catch (Exception e) {
+ session.getService().getIoHandler().exceptionCaught(session, e);
+ }
+ }
+
+ @Override
+ public void visit(SentEvent event) {
+ IoSession session = event.getSession();
+ try {
+ session.getService().getIoHandler().messageSent(session, event.getMessage());
+ } catch (Exception e) {
+ session.getService().getIoHandler().exceptionCaught(session, e);
+ }
+ }
+}
\ No newline at end of file
Added: mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/InOrderHandlerExecutor.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/InOrderHandlerExecutor.java?rev=1425131&view=auto
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/InOrderHandlerExecutor.java (added)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/InOrderHandlerExecutor.java Fri Dec 21 20:37:06 2012
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.service.executor;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InOrderHandlerExecutor implements IoHandlerExecutor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(InOrderHandlerExecutor.class);
+
+ private Worker[] workers;
+
+ public InOrderHandlerExecutor(int workerThreadCount, int queueSize) {
+ LOG.debug("creating InOrderHandlerExecutor workerThreadCount = {} queueSize = {}", workerThreadCount, queueSize);
+ workers = new Worker[workerThreadCount];
+
+ for (int i = 0; i < workerThreadCount; i++) {
+ workers[i] = new Worker(i, queueSize);
+ }
+ for (int i = 0; i < workerThreadCount; i++) {
+ workers[i].start();
+ }
+ LOG.debug("workers started");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void execute(Event event) {
+ try {
+ int workerIndex = (int) (event.getSession().getId() % workers.length);
+ LOG.debug("executing event {} in worker {}", event, workerIndex);
+ workers[workerIndex].enqueue(event);
+ } catch (InterruptedException e) {
+ // interrupt the world
+ return;
+ }
+ }
+
+ private static class Worker extends Thread {
+
+ private static HandlerCaller caller = new HandlerCaller();
+
+ private final BlockingQueue<Event> queue;
+
+ public Worker(int index, int queueSize) {
+ super("IoHandlerWorker " + index);
+ queue = new LinkedBlockingQueue<Event>(queueSize);
+ }
+
+ public void enqueue(Event event) throws InterruptedException {
+ LOG.debug("enqueing event : {}", event);
+ queue.put(event);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run() {
+ for (;;) {
+ try {
+
+ Event e = queue.take();
+ LOG.debug("dequeing event {}", e);
+ e.visit(caller);
+
+ } catch (InterruptedException e) {
+ // end this thread
+ return;
+ }
+ }
+ }
+ }
+}
Added: mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/IoHandlerExecutor.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/IoHandlerExecutor.java?rev=1425131&view=auto
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/IoHandlerExecutor.java (added)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/service/executor/IoHandlerExecutor.java Fri Dec 21 20:37:06 2012
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.service.executor;
+
+public interface IoHandlerExecutor {
+
+ void execute(Event command);
+}
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/service/server/AbstractIoServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/service/server/AbstractIoServer.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/service/server/AbstractIoServer.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/service/server/AbstractIoServer.java Fri Dec 21 20:37:06 2012
@@ -21,6 +21,7 @@ package org.apache.mina.service.server;
import org.apache.mina.api.IoServer;
import org.apache.mina.service.AbstractIoService;
+import org.apache.mina.service.executor.IoHandlerExecutor;
/**
* Base implementation for {@link IoServer}s.
@@ -30,9 +31,13 @@ import org.apache.mina.service.AbstractI
public abstract class AbstractIoServer extends AbstractIoService implements IoServer {
/**
* Create an new AbstractIoServer instance
+ *
+ * @param eventExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O one).
+ * Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+ * operations.
*/
- protected AbstractIoServer() {
- super();
+ protected AbstractIoServer(final IoHandlerExecutor eventExecutor) {
+ super(eventExecutor);
}
// does the reuse address flag should be positioned
@@ -40,6 +45,7 @@ public abstract class AbstractIoServer e
/**
* Set the reuse address flag on the server socket
+ *
* @param reuseAddress <code>true</code> to enable
*/
public void setReuseAddress(final boolean reuseAddress) {
@@ -48,6 +54,7 @@ public abstract class AbstractIoServer e
/**
* Is the reuse address enabled for this server.
+ *
* @return
*/
public boolean isReuseAddress() {
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java Fri Dec 21 20:37:06 2012
@@ -40,6 +40,12 @@ import org.apache.mina.api.IoService;
import org.apache.mina.api.IoSession;
import org.apache.mina.filterchain.ReadFilterChainController;
import org.apache.mina.filterchain.WriteFilterChainController;
+import org.apache.mina.service.executor.CloseEvent;
+import org.apache.mina.service.executor.IdleEvent;
+import org.apache.mina.service.executor.IoHandlerExecutor;
+import org.apache.mina.service.executor.OpenEvent;
+import org.apache.mina.service.executor.ReceiveEvent;
+import org.apache.mina.service.executor.SentEvent;
import org.apache.mina.service.idlechecker.IdleChecker;
import org.apache.mina.transport.nio.SelectorLoop;
import org.apache.mina.util.AbstractIoFuture;
@@ -650,7 +656,14 @@ public abstract class AbstractIoSession
final IoHandler handler = getService().getIoHandler();
if (handler != null) {
- handler.sessionOpened(this);
+ IoHandlerExecutor executor = getService().getIoHandlerExecutor();
+ if (executor != null) {
+ // asynchronous event
+ executor.execute(new OpenEvent(this));
+ } else {
+ // synchronous call (in the I/O loop)
+ handler.sessionOpened(this);
+ }
}
} catch (final RuntimeException e) {
processException(e);
@@ -669,7 +682,14 @@ public abstract class AbstractIoSession
final IoHandler handler = getService().getIoHandler();
if (handler != null) {
- handler.sessionClosed(this);
+ IoHandlerExecutor executor = getService().getIoHandlerExecutor();
+ if (executor != null) {
+ // asynchronous event
+ executor.execute(new CloseEvent(this));
+ } else {
+ // synchronous call (in the I/O loop)
+ handler.sessionClosed(this);
+ }
}
} catch (final RuntimeException e) {
processException(e);
@@ -688,14 +708,28 @@ public abstract class AbstractIoSession
}
final IoHandler handler = getService().getIoHandler();
if (handler != null) {
- handler.sessionIdle(this, status);
+ IoHandlerExecutor executor = getService().getIoHandlerExecutor();
+ if (executor != null) {
+ // asynchronous event
+ executor.execute(new IdleEvent(this, status));
+ } else {
+ // synchronous call (in the I/O loop)
+ handler.sessionIdle(this, status);
+ }
}
} catch (final RuntimeException e) {
processException(e);
}
-
}
+ /** for knowing if the message buffer is the selector loop one */
+ static ThreadLocal<ByteBuffer> tl = new ThreadLocal<ByteBuffer>() {
+ @Override
+ protected ByteBuffer initialValue() {
+ return null;
+ }
+ };
+
/**
* process session message received event using the filter chain. To be called by the session {@link SelectorLoop} .
*
@@ -704,6 +738,7 @@ public abstract class AbstractIoSession
public void processMessageReceived(final ByteBuffer message) {
LOG.debug("processing message '{}' received event for session {}", message, this);
+ tl.set(message);
try {
// save basic statistics
readBytes += message.remaining();
@@ -711,15 +746,31 @@ public abstract class AbstractIoSession
if (chain.length < 1) {
LOG.debug("Nothing to do, the chain is empty");
+ final IoHandler handler = getService().getIoHandler();
+ if (handler != null) {
+ IoHandlerExecutor executor = getService().getIoHandlerExecutor();
+ if (executor != null) {
+ // asynchronous event
+ // copy the bytebuffer
+ LOG.debug("copying bytebuffer before pushing to the executor");
+ ByteBuffer original = message;
+ ByteBuffer clone = ByteBuffer.allocate(original.capacity());
+ original.rewind();// copy from the beginning
+ clone.put(original);
+ original.rewind();
+ clone.flip();
+ executor.execute(new ReceiveEvent(this, clone));
+ } else {
+ // synchronous call (in the I/O loop)
+ handler.messageReceived(this, message);
+ }
+ }
+
} else {
readChainPosition = 0;
// we call the first filter, it's supposed to call the next ones using the filter chain controller
chain[readChainPosition].messageReceived(this, message, this);
}
- final IoHandler handler = getService().getIoHandler();
- if (handler != null) {
- handler.messageReceived(this, message);
- }
} catch (final RuntimeException e) {
processException(e);
}
@@ -772,7 +823,14 @@ public abstract class AbstractIoSession
}
final IoHandler handler = getService().getIoHandler();
if (handler != null) {
- handler.messageSent(this, highLevelMessage);
+ IoHandlerExecutor executor = getService().getIoHandlerExecutor();
+ if (executor != null) {
+ // asynchronous event
+ executor.execute(new SentEvent(this, highLevelMessage));
+ } else {
+ // synchronous call (in the I/O loop)
+ handler.messageSent(this, highLevelMessage);
+ }
}
} catch (final RuntimeException e) {
processException(e);
@@ -805,7 +863,7 @@ public abstract class AbstractIoSession
* At the end of write chain processing, enqueue final encoded {@link ByteBuffer} message in the session
*/
private void enqueueFinalWriteMessage(final Object message) {
- LOG.debug("end of write chan we enqueue the message in the session : {}", message);
+ LOG.debug("end of write chain we enqueue the message in the session : {}", message);
lastWriteRequest = enqueueWriteRequest(message);
}
@@ -818,6 +876,29 @@ public abstract class AbstractIoSession
if (readChainPosition >= chain.length) {
// end of chain processing
+ final IoHandler handler = getService().getIoHandler();
+ if (handler != null) {
+ IoHandlerExecutor executor = getService().getIoHandlerExecutor();
+ if (executor != null) {
+ // asynchronous event
+ if (message == tl.get()) {
+ // copy the bytebuffer
+ LOG.debug("copying bytebuffer before pushing to the executor");
+ ByteBuffer original = (ByteBuffer) message;
+ ByteBuffer clone = ByteBuffer.allocate(original.capacity());
+ original.rewind();// copy from the beginning
+ clone.put(original);
+ original.rewind();
+ clone.flip();
+ executor.execute(new ReceiveEvent(this, clone));
+ } else {
+ executor.execute(new ReceiveEvent(this, message));
+ }
+ } else {
+ // synchronous call (in the I/O loop)
+ handler.messageReceived(this, message);
+ }
+ }
} else {
chain[readChainPosition].messageReceived(this, message, this);
}
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java Fri Dec 21 20:37:06 2012
@@ -161,6 +161,7 @@ public class NioSelectorLoop implements
ops |= SelectionKey.OP_WRITE;
}
key.interestOps(ops);
+ key.selector().wakeup();
}
/**
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpClient.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpClient.java Fri Dec 21 20:37:06 2012
@@ -20,6 +20,7 @@
package org.apache.mina.transport.nio;
import org.apache.mina.api.IoSessionConfig;
+import org.apache.mina.service.executor.IoHandlerExecutor;
import org.apache.mina.transport.tcp.AbstractTcpClient;
/**
@@ -31,8 +32,8 @@ public class NioTcpClient extends Abstra
/**
* Create a new instance of NioTcpClient
*/
- public NioTcpClient() {
- super();
+ public NioTcpClient(IoHandlerExecutor ioHandlerExecutor) {
+ super(ioHandlerExecutor);
}
@Override
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java Fri Dec 21 20:37:06 2012
@@ -28,6 +28,8 @@ import java.nio.channels.ServerSocketCha
import java.nio.channels.SocketChannel;
import org.apache.mina.api.IdleStatus;
+import org.apache.mina.service.executor.InOrderHandlerExecutor;
+import org.apache.mina.service.executor.IoHandlerExecutor;
import org.apache.mina.service.idlechecker.IdleChecker;
import org.apache.mina.service.idlechecker.IndexedIdleChecker;
import org.apache.mina.transport.tcp.AbstractTcpServer;
@@ -60,23 +62,28 @@ public class NioTcpServer extends Abstra
private IdleChecker idleChecker;
/**
- * Create a TCP server with new selector pool of default size.
+ * Create a TCP server with new selector pool of default size and a {@link IoHandlerExecutor} of default type (
+ * {@link InOrderHandlerExecutor})
*/
public NioTcpServer() {
this(new NioSelectorLoop("accept", 0),
- new FixedSelectorLoopPool(Runtime.getRuntime().availableProcessors() + 1));
+ new FixedSelectorLoopPool(Runtime.getRuntime().availableProcessors() + 1), new InOrderHandlerExecutor(
+ Runtime.getRuntime().availableProcessors() + 1, Runtime.getRuntime().availableProcessors() + 1));
}
/**
- * Create a TCP server with provided selector loops pool. We will use one SelectorLoop get from
- * the pool to manage the OP_ACCEPT events. If the pool contains only one SelectorLoop, then
- * all the events will be managed by the same Selector.
+ * Create a TCP server with provided selector loops pool. We will use one SelectorLoop get from the pool to manage
+ * the OP_ACCEPT events. If the pool contains only one SelectorLoop, then all the events will be managed by the same
+ * Selector.
*
* @param acceptSelectorLoop the selector loop for handling accept events (connection of new session)
* @param readWriteSelectorLoop the pool of selector loop for handling read/write events of connected sessions
+ * @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
+ * one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+ * operations.
*/
- public NioTcpServer(SelectorLoopPool selectorLoopPool) {
- super();
+ public NioTcpServer(SelectorLoopPool selectorLoopPool, IoHandlerExecutor handlerExecutor) {
+ super(handlerExecutor);
this.acceptSelectorLoop = selectorLoopPool.getSelectorLoop();
this.readWriteSelectorPool = selectorLoopPool;
}
@@ -86,9 +93,13 @@ public class NioTcpServer extends Abstra
*
* @param acceptSelectorLoop the selector loop for handling accept events (connection of new session)
* @param readWriteSelectorLoop the pool of selector loop for handling read/write events of connected sessions
- */
- public NioTcpServer(SelectorLoop acceptSelectorLoop, SelectorLoopPool readWriteSelectorLoop) {
- super();
+ * @param ioHandlerExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O
+ * one). Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+ * operations.
+ */
+ public NioTcpServer(SelectorLoop acceptSelectorLoop, SelectorLoopPool readWriteSelectorLoop,
+ IoHandlerExecutor handlerExecutor) {
+ super(handlerExecutor);
this.acceptSelectorLoop = acceptSelectorLoop;
this.readWriteSelectorPool = readWriteSelectorLoop;
}
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpClient.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpClient.java Fri Dec 21 20:37:06 2012
@@ -20,6 +20,7 @@
package org.apache.mina.transport.nio;
import org.apache.mina.api.IoSessionConfig;
+import org.apache.mina.service.executor.IoHandlerExecutor;
import org.apache.mina.transport.udp.AbstractUdpClient;
/**
@@ -31,8 +32,8 @@ public class NioUdpClient extends Abstra
/**
* Create a new instance of NioUdpClient
*/
- public NioUdpClient() {
- super();
+ public NioUdpClient(IoHandlerExecutor ioHandlerExecutor) {
+ super(ioHandlerExecutor);
}
@Override
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java Fri Dec 21 20:37:06 2012
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.mina.api.IoSessionConfig;
+import org.apache.mina.service.executor.IoHandlerExecutor;
import org.apache.mina.service.idlechecker.IdleChecker;
import org.apache.mina.service.idlechecker.IndexedIdleChecker;
import org.apache.mina.transport.udp.AbstractUdpServer;
@@ -65,8 +66,8 @@ public class NioUdpServer extends Abstra
/**
* Create a new instance of NioUdpServer
*/
- public NioUdpServer(final NioSelectorLoop selectorLoop) {
- super();
+ public NioUdpServer(NioSelectorLoop selectorLoop, IoHandlerExecutor ioHandlerExecutor) {
+ super(ioHandlerExecutor);
this.selectorLoop = selectorLoop;
}
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpClient.java Fri Dec 21 20:37:06 2012
@@ -20,6 +20,7 @@
package org.apache.mina.transport.tcp;
import org.apache.mina.service.client.AbstractIoClient;
+import org.apache.mina.service.executor.IoHandlerExecutor;
/**
* Base class for UDP based Clients
@@ -30,7 +31,7 @@ public abstract class AbstractTcpClient
/**
* Create an new AbsractTcpClient instance
*/
- protected AbstractTcpClient() {
- super();
+ protected AbstractTcpClient(IoHandlerExecutor ioHandlerExecutor) {
+ super(ioHandlerExecutor);
}
}
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java Fri Dec 21 20:37:06 2012
@@ -19,6 +19,7 @@
*/
package org.apache.mina.transport.tcp;
+import org.apache.mina.service.executor.IoHandlerExecutor;
import org.apache.mina.service.server.AbstractIoServer;
/**
@@ -32,9 +33,13 @@ public abstract class AbstractTcpServer
/**
* Create an new AbsractTcpServer instance
+ *
+ * @param eventExecutor used for executing IoHandler event in another pool of thread (not in the low level I/O one).
+ * Use <code>null</code> if you don't want one. Be careful, the IoHandler processing will block the I/O
+ * operations.
*/
- protected AbstractTcpServer() {
- super();
+ protected AbstractTcpServer(final IoHandlerExecutor eventExecutor) {
+ super(eventExecutor);
this.config = new DefaultTcpSessionConfig();
}
@@ -48,6 +53,7 @@ public abstract class AbstractTcpServer
/**
* Set the default configuration for created TCP sessions
+ *
* @param config
*/
public void setSessionConfig(final TcpSessionConfig config) {
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpClient.java Fri Dec 21 20:37:06 2012
@@ -23,6 +23,7 @@ import javax.net.ssl.SSLException;
import org.apache.mina.api.IoSession;
import org.apache.mina.service.client.AbstractIoClient;
+import org.apache.mina.service.executor.IoHandlerExecutor;
/**
* TODO
@@ -33,8 +34,8 @@ public abstract class AbstractUdpClient
/**
* Create an new AbsractUdpClient instance
*/
- protected AbstractUdpClient() {
- super();
+ protected AbstractUdpClient(IoHandlerExecutor ioHandlerExecutor) {
+ super(ioHandlerExecutor);
}
/**
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java Fri Dec 21 20:37:06 2012
@@ -22,6 +22,7 @@ package org.apache.mina.transport.udp;
import javax.net.ssl.SSLException;
import org.apache.mina.api.IoSession;
+import org.apache.mina.service.executor.IoHandlerExecutor;
import org.apache.mina.service.server.AbstractIoServer;
/**
@@ -33,8 +34,8 @@ public abstract class AbstractUdpServer
/**
* Create an new AbsractUdpServer instance
*/
- protected AbstractUdpServer() {
- super();
+ protected AbstractUdpServer(IoHandlerExecutor ioHandlerExecutor) {
+ super(ioHandlerExecutor);
}
/**
Modified: mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerFilterEventTest.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerFilterEventTest.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerFilterEventTest.java (original)
+++ mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerFilterEventTest.java Fri Dec 21 20:37:06 2012
@@ -19,9 +19,7 @@
*/
package org.apache.mina.transport.tcp;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
+import static junit.framework.Assert.*;
import java.io.IOException;
import java.net.Socket;
@@ -76,7 +74,7 @@ public class NioTcpServerFilterEventTest
// connect some clients
for (int i = 0; i < CLIENT_COUNT; i++) {
- //System.out.println("Creation client " + i);
+ // System.out.println("Creation client " + i);
try {
clients[i] = new Socket("127.0.0.1", port);
} catch (Exception e) {
@@ -126,15 +124,14 @@ public class NioTcpServerFilterEventTest
}
/**
- * A test that creates 50 clients, each one of them writing one message. We will
- * check that for each client we correctly process the sessionOpened, messageReceived,
- * messageSent and sessionClosed events.
- * We use only one selector to process all the OP events.
+ * A test that creates 50 clients, each one of them writing one message. We will check that for each client we
+ * correctly process the sessionOpened, messageReceived, messageSent and sessionClosed events. We use only one
+ * selector to process all the OP events.
*/
@Test
public void generateAllKindOfServerEventOneSelector() throws IOException, InterruptedException {
SelectorLoopPool selectorLoopPool = new FixedSelectorLoopPool(1);
- final NioTcpServer server = new NioTcpServer(selectorLoopPool.getSelectorLoop(), selectorLoopPool);
+ final NioTcpServer server = new NioTcpServer(selectorLoopPool.getSelectorLoop(), selectorLoopPool, null);
server.setFilters(new MyCodec(), new Handler());
server.bind(0);
// warm up
@@ -147,7 +144,7 @@ public class NioTcpServerFilterEventTest
// connect some clients
for (int i = 0; i < CLIENT_COUNT; i++) {
- //System.out.println("Creation client 2 " + i);
+ // System.out.println("Creation client 2 " + i);
clients[i] = new Socket("127.0.0.1", port);
}
Modified: mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerHandlerTest.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerHandlerTest.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerHandlerTest.java (original)
+++ mina/mina/trunk/core/src/test/java/org/apache/mina/transport/tcp/NioTcpServerHandlerTest.java Fri Dec 21 20:37:06 2012
@@ -19,8 +19,7 @@
*/
package org.apache.mina.transport.tcp;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.*;
import java.io.IOException;
import java.net.Socket;
@@ -53,7 +52,7 @@ public class NioTcpServerHandlerTest {
private static final int CLIENT_COUNT = 100;
- private static final int WAIT_TIME = 2000;
+ private static final int WAIT_TIME = 5000;
private final CountDownLatch msgSentLatch = new CountDownLatch(CLIENT_COUNT);
@@ -127,7 +126,7 @@ public class NioTcpServerHandlerTest {
@Test
public void generateAllKindOfServerEventOneSelector() throws IOException, InterruptedException {
SelectorLoopPool selectorLoopPool = new FixedSelectorLoopPool(1);
- final NioTcpServer server = new NioTcpServer(selectorLoopPool.getSelectorLoop(), selectorLoopPool);
+ final NioTcpServer server = new NioTcpServer(selectorLoopPool.getSelectorLoop(), selectorLoopPool, null);
server.setFilters();
server.setIoHandler(new Handler());
server.bind(0);
@@ -184,19 +183,19 @@ public class NioTcpServerHandlerTest {
@Override
public void sessionOpened(final IoSession session) {
- LOG.info("** session open");
+ LOG.debug("** session open");
openLatch.countDown();
}
@Override
public void sessionClosed(final IoSession session) {
- LOG.info("** session closed");
+ LOG.debug("** session closed");
closedLatch.countDown();
}
@Override
public void messageReceived(final IoSession session, final Object message) {
- LOG.info("** message received {}", message);
+ LOG.debug("** message received {}", message);
msgReadLatch.countDown();
if (message instanceof ByteBuffer) {
final ByteBuffer msg = (ByteBuffer) message;
@@ -208,7 +207,7 @@ public class NioTcpServerHandlerTest {
@Override
public void messageSent(final IoSession session, final Object message) {
- LOG.info("** message sent {}", message);
+ LOG.debug("** message sent {}", message);
msgSentLatch.countDown();
}
Modified: mina/mina/trunk/core/src/test/resources/log4j2-test.xml
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/test/resources/log4j2-test.xml?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/core/src/test/resources/log4j2-test.xml (original)
+++ mina/mina/trunk/core/src/test/resources/log4j2-test.xml Fri Dec 21 20:37:06 2012
@@ -7,7 +7,7 @@
</appenders>
<loggers>
<logger name="org.apache.log4j.xml" level="info"/>
- <root level="debug">
+ <root level="info">
<appender-ref ref="STDOUT"/>
</root>
</loggers>
Modified: mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/udpecho/NioUdpEchoServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/udpecho/NioUdpEchoServer.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/udpecho/NioUdpEchoServer.java (original)
+++ mina/mina/trunk/examples/src/main/java/org/apache/mina/examples/udpecho/NioUdpEchoServer.java Fri Dec 21 20:37:06 2012
@@ -47,7 +47,7 @@ public class NioUdpEchoServer {
public static void main(final String[] args) {
LOG.info("starting echo server");
- final NioUdpServer server = new NioUdpServer(new NioSelectorLoop("I/O", 0));
+ final NioUdpServer server = new NioUdpServer(new NioSelectorLoop("I/O", 0), null);
// create the fitler chain for this service
server.setFilters(new LoggingFilter("LoggingFilter1"), new IoFilter() {
Modified: mina/mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerDecoder.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerDecoder.java?rev=1425131&r1=1425130&r2=1425131&view=diff
==============================================================================
--- mina/mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerDecoder.java (original)
+++ mina/mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerDecoder.java Fri Dec 21 20:37:06 2012
@@ -102,25 +102,6 @@ public class HttpServerDecoder implement
session.setAttribute(DECODER_STATE_ATT, DecoderState.HEAD);
} else {
controller.callReadNextFilter(rq);
-
- // is it a request with some body content ?
- if (rq.getMethod() == HttpMethod.POST || rq.getMethod() == HttpMethod.PUT) {
- LOG.debug("request with content");
- session.setAttribute(DECODER_STATE_ATT, DecoderState.BODY);
-
- String contentLen = rq.getHeader("content-length");
-
- if (contentLen != null) {
- LOG.debug("found content len : {}", contentLen);
- session.setAttribute(BODY_REMAINING_BYTES, new Integer(contentLen));
- } else {
- throw new RuntimeException("no content length !");
- }
- } else {
- LOG.debug("request without content");
- session.setAttribute(DECODER_STATE_ATT, DecoderState.NEW);
-
- }
}
return null;