You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by ma...@apache.org on 2010/09/12 18:43:52 UTC
svn commit: r996337 - in /mina/trunk/mina-core/src:
main/java/org/apache/mina/core/service/
test/java/org/apache/mina/core/service/
test/java/org/apache/mina/filter/logging/
Author: maarten
Date: Sun Sep 12 16:43:52 2010
New Revision: 996337
URL: http://svn.apache.org/viewvc?rev=996337&view=rev
Log:
added the API to block until ExecutorService of AbstractIoService is terminated
Added an overloaded dispose method : public final void dispose(boolean awaitTermination) that can block until ExecutorService of AbstractIoService is terminated
Added:
mina/trunk/mina-core/src/test/java/org/apache/mina/core/service/
mina/trunk/mina-core/src/test/java/org/apache/mina/core/service/AbstractIoServiceTest.java
Modified:
mina/trunk/mina-core/src/main/java/org/apache/mina/core/service/AbstractIoService.java
mina/trunk/mina-core/src/main/java/org/apache/mina/core/service/IoService.java
mina/trunk/mina-core/src/test/java/org/apache/mina/filter/logging/MdcInjectionFilterTest.java
Modified: mina/trunk/mina-core/src/main/java/org/apache/mina/core/service/AbstractIoService.java
URL: http://svn.apache.org/viewvc/mina/trunk/mina-core/src/main/java/org/apache/mina/core/service/AbstractIoService.java?rev=996337&r1=996336&r2=996337&view=diff
==============================================================================
--- mina/trunk/mina-core/src/main/java/org/apache/mina/core/service/AbstractIoService.java (original)
+++ mina/trunk/mina-core/src/main/java/org/apache/mina/core/service/AbstractIoService.java Sun Sep 12 16:43:52 2010
@@ -27,6 +27,7 @@ import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.core.IoUtil;
@@ -47,6 +48,8 @@ import org.apache.mina.core.session.IoSe
import org.apache.mina.core.session.IoSessionInitializer;
import org.apache.mina.util.ExceptionMonitor;
import org.apache.mina.util.NamePreservingRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Base implementation of {@link IoService}s.
@@ -57,6 +60,8 @@ import org.apache.mina.util.NamePreservi
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public abstract class AbstractIoService implements IoService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIoService.class);
/**
* The unique number identifying the Service. It's incremented
* for each new IoService created.
@@ -269,28 +274,48 @@ public abstract class AbstractIoService
* {@inheritDoc}
*/
public final void dispose() {
- if (disposed) {
- return;
- }
+ dispose(false);
+ }
- synchronized (disposalLock) {
- if (!disposing) {
- disposing = true;
-
- try {
- dispose0();
- } catch (Exception e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
+ /**
+ * {@inheritDoc}
+ */
+ public final void dispose(boolean awaitTermination) {
+ if (disposed) {
+ return;
+ }
+
+ synchronized (disposalLock) {
+ if (!disposing) {
+ disposing = true;
+
+ try {
+ dispose0();
+ } catch (Exception e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+
+ if (createdExecutor) {
+ ExecutorService e = (ExecutorService) executor;
+ e.shutdownNow();
+ if (awaitTermination) {
+
+ //Thread.currentThread().setName();
+
+ try {
+ LOGGER.debug("awaitTermination on {} called by thread=[{}]", this, Thread.currentThread().getName());
+ e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
+ LOGGER.debug("awaitTermination on {} finished", this);
+ } catch (InterruptedException e1) {
+ LOGGER.warn("awaitTermination on [{}] was interrupted", this);
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
}
- }
-
- if (createdExecutor) {
- ExecutorService e = (ExecutorService) executor;
- e.shutdownNow();
- }
-
- disposed = true;
+ }
+ }
+ disposed = true;
}
/**
@@ -474,7 +499,7 @@ public abstract class AbstractIoService
/**
* Implement this method to perform additional tasks required for session
* initialization. Do not call this method directly;
- * {@link #finishSessionInitialization(IoSession, IoFuture, IoSessionInitializer)} will call
+ * {@link #initSession(IoSession, IoFuture, IoSessionInitializer)} will call
* this method instead.
*/
protected void finishSessionInitialization0(IoSession session,
Modified: mina/trunk/mina-core/src/main/java/org/apache/mina/core/service/IoService.java
URL: http://svn.apache.org/viewvc/mina/trunk/mina-core/src/main/java/org/apache/mina/core/service/IoService.java?rev=996337&r1=996336&r2=996337&view=diff
==============================================================================
--- mina/trunk/mina-core/src/main/java/org/apache/mina/core/service/IoService.java (original)
+++ mina/trunk/mina-core/src/main/java/org/apache/mina/core/service/IoService.java Sun Sep 12 16:43:52 2010
@@ -75,6 +75,17 @@ public interface IoService {
*/
void dispose();
+ /**
+ * Releases any resources allocated by this service. Please note that
+ * this method might block as long as there are any sessions managed by this service.
+ *
+ * Warning : calling this method from a IoFutureListener with <code>awaitTermination</code> = true
+ * will probably lead to a deadlock.
+ *
+ * @param awaitTermination When true this method will block until the underlying ExecutorService is terminated
+ */
+ void dispose(boolean awaitTermination);
+
/**
* Returns the handler which will handle all connections managed by this service.
*/
Added: mina/trunk/mina-core/src/test/java/org/apache/mina/core/service/AbstractIoServiceTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/mina-core/src/test/java/org/apache/mina/core/service/AbstractIoServiceTest.java?rev=996337&view=auto
==============================================================================
--- mina/trunk/mina-core/src/test/java/org/apache/mina/core/service/AbstractIoServiceTest.java (added)
+++ mina/trunk/mina-core/src/test/java/org/apache/mina/core/service/AbstractIoServiceTest.java Sun Sep 12 16:43:52 2010
@@ -0,0 +1,158 @@
+package org.apache.mina.core.service;
+
+import junit.framework.Assert;
+import org.apache.mina.core.future.CloseFuture;
+import org.apache.mina.core.future.ConnectFuture;
+import org.apache.mina.core.future.IoFuture;
+import org.apache.mina.core.future.IoFutureListener;
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
+import org.apache.mina.filter.logging.LoggingFilter;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.apache.mina.transport.socket.nio.NioSocketConnector;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+public class AbstractIoServiceTest {
+
+ private static final int PORT = 9123;
+
+ @Test
+ public void testDispose() throws IOException, InterruptedException {
+
+ List threadsBefore = getThreadNames();
+
+ final IoAcceptor acceptor = new NioSocketAcceptor();
+
+ acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
+ acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
+
+ acceptor.setHandler( new ServerHandler() );
+
+ acceptor.getSessionConfig().setReadBufferSize( 2048 );
+ acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
+ acceptor.bind( new InetSocketAddress(PORT) );
+ System.out.println("Server running ...");
+
+ final NioSocketConnector connector = new NioSocketConnector();
+
+ // Set connect timeout.
+ connector.setConnectTimeoutMillis(30 * 1000L);
+
+ connector.setHandler(new ClientHandler());
+ connector.getFilterChain().addLast( "logger", new LoggingFilter() );
+ connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
+
+ // Start communication.
+ ConnectFuture cf = connector.connect(new InetSocketAddress("localhost", 9123));
+ cf.awaitUninterruptibly();
+
+ IoSession session = cf.getSession();
+
+ // send a message
+ session.write("Hello World!\r");
+
+ // wait until response is received
+ CountDownLatch latch = (CountDownLatch) session.getAttribute("latch");
+ latch.await();
+
+ // close the session
+ CloseFuture closeFuture = session.close(false);
+
+ System.out.println("session.close called");
+ //Thread.sleep(5);
+
+ // wait for session close and then dispose the connector
+ closeFuture.addListener(new IoFutureListener<IoFuture>() {
+
+ public void operationComplete(IoFuture future) {
+ System.out.println("managed session count=" + connector.getManagedSessionCount());
+ System.out.println("Disposing connector ...");
+ connector.dispose(true);
+ System.out.println("Disposing connector ... *finished*");
+
+ }
+ });
+
+ closeFuture.awaitUninterruptibly();
+ acceptor.dispose(true);
+
+ List threadsAfter = getThreadNames();
+
+ System.out.println("threadsBefore = " + threadsBefore);
+ System.out.println("threadsAfter = " + threadsAfter);
+
+ // Assert.assertEquals(threadsBefore, threadsAfter);
+
+ }
+
+
+ public static class ClientHandler extends IoHandlerAdapter {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger("CLIENT");
+
+ @Override
+ public void sessionCreated(IoSession session) throws Exception {
+ session.setAttribute("latch", new CountDownLatch(1));
+ }
+
+ @Override
+ public void messageReceived(IoSession session, Object message) throws Exception {
+ LOGGER.info("client: messageReceived("+session+", "+message+")");
+ CountDownLatch latch = (CountDownLatch) session.getAttribute("latch");
+ latch.countDown();
+ }
+
+ @Override
+ public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
+ LOGGER.warn("exceptionCaught:", cause);
+ }
+ }
+
+ public static class ServerHandler extends IoHandlerAdapter {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger("SERVER");
+
+ @Override
+ public void messageReceived(IoSession session, Object message) throws Exception {
+ LOGGER.info("server: messageReceived("+session+", "+message+")");
+ session.write(message.toString());
+ }
+
+ @Override
+ public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
+ LOGGER.warn("exceptionCaught:", cause);
+ }
+
+ }
+
+ public static void main(String[] args) throws IOException, InterruptedException {
+ new AbstractIoServiceTest().testDispose();
+ }
+
+ private List<String> getThreadNames() {
+ List<String> list = new ArrayList<String>();
+ int active = Thread.activeCount();
+ Thread[] threads = new Thread[active];
+ Thread.enumerate(threads);
+ for (Thread thread : threads) {
+ try {
+ String name = thread.getName();
+ list.add(name);
+ } catch (NullPointerException ignore) {
+ }
+ }
+ return list;
+ }
+
+}
Modified: mina/trunk/mina-core/src/test/java/org/apache/mina/filter/logging/MdcInjectionFilterTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/mina-core/src/test/java/org/apache/mina/filter/logging/MdcInjectionFilterTest.java?rev=996337&r1=996336&r2=996337&view=diff
==============================================================================
--- mina/trunk/mina-core/src/test/java/org/apache/mina/filter/logging/MdcInjectionFilterTest.java (original)
+++ mina/trunk/mina-core/src/test/java/org/apache/mina/filter/logging/MdcInjectionFilterTest.java Sun Sep 12 16:43:52 2010
@@ -95,15 +95,13 @@ public class MdcInjectionFilterTest {
@After
public void tearDown() throws Exception {
- acceptor.dispose();
+ acceptor.dispose(true);
org.apache.log4j.Logger.getRootLogger().setLevel(previousLevelRootLogger);
destroy(executorFilter1);
destroy(executorFilter2);
List<String> after = getThreadNames();
- System.out.println("");
- System.out.println("after = " + after);
// give acceptor some time to shut down
Thread.sleep(50);
@@ -111,16 +109,24 @@ public class MdcInjectionFilterTest {
int count = 0;
- // NOTE: this is *not* intended to be a permanenet fix for this test-case.
- // There just is no API to block until the ExecutorService of AbstractIoService is terminated.
+ // NOTE: this is *not* intended to be a permanent fix for this test-case.
+ // There used to be no API to block until the ExecutorService of AbstractIoService is terminated.
+ // The API exists now : dispose(true) so we should get rid of this code.
while (contains(after, "Nio") && count++ < 10) {
Thread.sleep(50);
after = getThreadNames();
- System.out.println("after = " + after);
+ System.out.println("** after = " + after);
}
System.out.println("============================");
-
+
+ while (contains(after, "pool") && count++ < 10) {
+ Thread.sleep(50);
+ after = getThreadNames();
+ System.out.println("** after = " + after);
+ }
+ System.out.println("============================");
+
// The problem is that we clear the events of the appender here, but it's possible that a thread from
// a previous test still generates events during the execution of the next test
appender.clear();
@@ -131,7 +137,7 @@ public class MdcInjectionFilterTest {
ExecutorService executor = (ExecutorService) executorFilter.getExecutor();
executor.shutdown();
while (!executor.isTerminated()) {
- System.out.println("Waiting for termination of " + executorFilter);
+ //System.out.println("Waiting for termination of " + executorFilter);
executor.awaitTermination(10, TimeUnit.MILLISECONDS);
}
}
@@ -235,12 +241,15 @@ public class MdcInjectionFilterTest {
simpleIoHandler.messageSentLatch.await();
simpleIoHandler.sessionIdleLatch.await();
simpleIoHandler.sessionClosedLatch.await();
- connector.dispose();
+ connector.dispose(true);
// make a copy to prevent ConcurrentModificationException
List<LoggingEvent> events = new ArrayList<LoggingEvent>(appender.events);
// verify that all logging events have correct MDC
for (LoggingEvent event : events) {
+ if (event.getLoggerName().startsWith("org.apache.mina.core.service.AbstractIoService")) {
+ continue;
+ }
for (MdcInjectionFilter.MdcKey mdcKey : MdcInjectionFilter.MdcKey.values()) {
String key = mdcKey.name();
Object value = event.getMDC(key);
@@ -271,7 +280,7 @@ public class MdcInjectionFilterTest {
simpleIoHandler.messageSentLatch.await();
simpleIoHandler.sessionIdleLatch.await();
simpleIoHandler.sessionClosedLatch.await();
- connector.dispose();
+ connector.dispose(true);
// make a copy to prevent ConcurrentModificationException
List<LoggingEvent> events = new ArrayList<LoggingEvent>(appender.events);