You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by ma...@apache.org on 2010/09/12 18:43:52 UTC

svn commit: r996337 - in /mina/trunk/mina-core/src: main/java/org/apache/mina/core/service/ test/java/org/apache/mina/core/service/ test/java/org/apache/mina/filter/logging/

Author: maarten
Date: Sun Sep 12 16:43:52 2010
New Revision: 996337

URL: http://svn.apache.org/viewvc?rev=996337&view=rev
Log:
added the API to block until ExecutorService of AbstractIoService is terminated 

Added an overloaded dispose method :  public final void dispose(boolean awaitTermination) that can block until ExecutorService of AbstractIoService is terminated

Added:
    mina/trunk/mina-core/src/test/java/org/apache/mina/core/service/
    mina/trunk/mina-core/src/test/java/org/apache/mina/core/service/AbstractIoServiceTest.java
Modified:
    mina/trunk/mina-core/src/main/java/org/apache/mina/core/service/AbstractIoService.java
    mina/trunk/mina-core/src/main/java/org/apache/mina/core/service/IoService.java
    mina/trunk/mina-core/src/test/java/org/apache/mina/filter/logging/MdcInjectionFilterTest.java

Modified: mina/trunk/mina-core/src/main/java/org/apache/mina/core/service/AbstractIoService.java
URL: http://svn.apache.org/viewvc/mina/trunk/mina-core/src/main/java/org/apache/mina/core/service/AbstractIoService.java?rev=996337&r1=996336&r2=996337&view=diff
==============================================================================
--- mina/trunk/mina-core/src/main/java/org/apache/mina/core/service/AbstractIoService.java (original)
+++ mina/trunk/mina-core/src/main/java/org/apache/mina/core/service/AbstractIoService.java Sun Sep 12 16:43:52 2010
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.mina.core.IoUtil;
@@ -47,6 +48,8 @@ import org.apache.mina.core.session.IoSe
 import org.apache.mina.core.session.IoSessionInitializer;
 import org.apache.mina.util.ExceptionMonitor;
 import org.apache.mina.util.NamePreservingRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Base implementation of {@link IoService}s.
@@ -57,6 +60,8 @@ import org.apache.mina.util.NamePreservi
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public abstract class AbstractIoService implements IoService {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIoService.class);
     /** 
      * The unique number identifying the Service. It's incremented
      * for each new IoService created.
@@ -269,28 +274,48 @@ public abstract class AbstractIoService 
      * {@inheritDoc}
      */
     public final void dispose() {
-        if (disposed) {
-            return;
-        }
+      dispose(false);
+    }
 
-        synchronized (disposalLock) {
-            if (!disposing) {
-                disposing = true;
-            
-                try {
-                    dispose0();
-                } catch (Exception e) {
-                    ExceptionMonitor.getInstance().exceptionCaught(e);
-                }
+  /**
+   * {@inheritDoc}
+   */
+    public final void dispose(boolean awaitTermination) {
+      if (disposed) {
+          return;
+      }
+
+      synchronized (disposalLock) {
+          if (!disposing) {
+              disposing = true;
+
+              try {
+                  dispose0();
+              } catch (Exception e) {
+                  ExceptionMonitor.getInstance().exceptionCaught(e);
+              }
+          }
+      }
+
+      if (createdExecutor) {
+          ExecutorService e = (ExecutorService) executor;
+          e.shutdownNow();
+          if (awaitTermination) {
+
+            //Thread.currentThread().setName();
+
+            try {
+              LOGGER.debug("awaitTermination on {} called by thread=[{}]", this, Thread.currentThread().getName());
+              e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
+              LOGGER.debug("awaitTermination on {} finished", this);
+            } catch (InterruptedException e1) {
+              LOGGER.warn("awaitTermination on [{}] was interrupted", this);
+              // Restore the interrupted status
+              Thread.currentThread().interrupt();
             }
-        }
-        
-        if (createdExecutor) {
-            ExecutorService e = (ExecutorService) executor;
-            e.shutdownNow();
-        }
-
-        disposed = true;
+          }
+      }
+      disposed = true;
     }
 
     /**
@@ -474,7 +499,7 @@ public abstract class AbstractIoService 
     /**
      * Implement this method to perform additional tasks required for session
      * initialization. Do not call this method directly;
-     * {@link #finishSessionInitialization(IoSession, IoFuture, IoSessionInitializer)} will call
+     * {@link #initSession(IoSession, IoFuture, IoSessionInitializer)} will call
      * this method instead.
      */
     protected void finishSessionInitialization0(IoSession session,

Modified: mina/trunk/mina-core/src/main/java/org/apache/mina/core/service/IoService.java
URL: http://svn.apache.org/viewvc/mina/trunk/mina-core/src/main/java/org/apache/mina/core/service/IoService.java?rev=996337&r1=996336&r2=996337&view=diff
==============================================================================
--- mina/trunk/mina-core/src/main/java/org/apache/mina/core/service/IoService.java (original)
+++ mina/trunk/mina-core/src/main/java/org/apache/mina/core/service/IoService.java Sun Sep 12 16:43:52 2010
@@ -75,6 +75,17 @@ public interface IoService {
      */
     void dispose();
 
+  /**
+   * Releases any resources allocated by this service.  Please note that
+   * this method might block as long as there are any sessions managed by this service.
+   *
+   * Warning : calling this method from a IoFutureListener with <code>awaitTermination</code> = true
+   * will probably lead to a deadlock.
+   *
+   * @param awaitTermination When true this method will block until the underlying ExecutorService is terminated
+   */
+    void dispose(boolean awaitTermination);
+
     /**
      * Returns the handler which will handle all connections managed by this service.
      */

Added: mina/trunk/mina-core/src/test/java/org/apache/mina/core/service/AbstractIoServiceTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/mina-core/src/test/java/org/apache/mina/core/service/AbstractIoServiceTest.java?rev=996337&view=auto
==============================================================================
--- mina/trunk/mina-core/src/test/java/org/apache/mina/core/service/AbstractIoServiceTest.java (added)
+++ mina/trunk/mina-core/src/test/java/org/apache/mina/core/service/AbstractIoServiceTest.java Sun Sep 12 16:43:52 2010
@@ -0,0 +1,158 @@
+package org.apache.mina.core.service;
+
+import junit.framework.Assert;
+import org.apache.mina.core.future.CloseFuture;
+import org.apache.mina.core.future.ConnectFuture;
+import org.apache.mina.core.future.IoFuture;
+import org.apache.mina.core.future.IoFutureListener;
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
+import org.apache.mina.filter.logging.LoggingFilter;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.apache.mina.transport.socket.nio.NioSocketConnector;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+public class AbstractIoServiceTest {
+
+  private static final int PORT = 9123;
+
+  @Test
+  public void testDispose() throws IOException, InterruptedException {
+
+    List threadsBefore = getThreadNames();
+
+    final IoAcceptor acceptor = new NioSocketAcceptor();
+
+    acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
+    acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
+
+    acceptor.setHandler(  new ServerHandler() );
+
+    acceptor.getSessionConfig().setReadBufferSize( 2048 );
+    acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
+    acceptor.bind( new InetSocketAddress(PORT) );
+    System.out.println("Server running ...");
+
+    final NioSocketConnector connector = new NioSocketConnector();
+
+    // Set connect timeout.
+    connector.setConnectTimeoutMillis(30 * 1000L);
+
+    connector.setHandler(new ClientHandler());
+    connector.getFilterChain().addLast( "logger", new LoggingFilter() );
+    connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
+
+    // Start communication.
+    ConnectFuture cf = connector.connect(new InetSocketAddress("localhost", 9123));
+    cf.awaitUninterruptibly();
+
+    IoSession session = cf.getSession();
+
+    // send a message
+    session.write("Hello World!\r");
+
+    // wait until response is received
+    CountDownLatch latch = (CountDownLatch) session.getAttribute("latch");
+    latch.await();
+
+    // close the session
+    CloseFuture closeFuture = session.close(false);
+
+    System.out.println("session.close called");
+    //Thread.sleep(5);
+
+    // wait for session close and then dispose the connector
+    closeFuture.addListener(new IoFutureListener<IoFuture>() {
+
+      public void operationComplete(IoFuture future) {
+        System.out.println("managed session count=" + connector.getManagedSessionCount());
+        System.out.println("Disposing connector ...");
+        connector.dispose(true);
+        System.out.println("Disposing connector ... *finished*");
+
+      }
+    });
+
+    closeFuture.awaitUninterruptibly();    
+    acceptor.dispose(true);
+
+    List threadsAfter = getThreadNames();
+
+    System.out.println("threadsBefore = " + threadsBefore);
+    System.out.println("threadsAfter  = " + threadsAfter);
+
+    // Assert.assertEquals(threadsBefore, threadsAfter);
+
+  }
+
+
+  public static class ClientHandler extends IoHandlerAdapter {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger("CLIENT");
+
+    @Override
+    public void sessionCreated(IoSession session) throws Exception {
+      session.setAttribute("latch", new CountDownLatch(1));
+    }
+
+    @Override
+    public void messageReceived(IoSession session, Object message) throws Exception {
+      LOGGER.info("client: messageReceived("+session+", "+message+")");
+      CountDownLatch latch = (CountDownLatch) session.getAttribute("latch");
+      latch.countDown();
+    }
+
+    @Override
+    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
+      LOGGER.warn("exceptionCaught:", cause);
+    }
+  }
+
+  public static class ServerHandler extends IoHandlerAdapter {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger("SERVER");
+
+    @Override
+    public void messageReceived(IoSession session, Object message) throws Exception {
+      LOGGER.info("server: messageReceived("+session+", "+message+")");
+      session.write(message.toString());
+    }
+
+    @Override
+    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
+      LOGGER.warn("exceptionCaught:", cause);
+    }
+
+  }
+
+  public static void main(String[] args) throws IOException, InterruptedException {
+    new AbstractIoServiceTest().testDispose();
+  }
+
+  private List<String> getThreadNames() {
+      List<String> list = new ArrayList<String>();
+      int active = Thread.activeCount();
+      Thread[] threads = new Thread[active];
+      Thread.enumerate(threads);
+      for (Thread thread : threads) {
+          try {
+              String name = thread.getName();
+              list.add(name);
+          } catch (NullPointerException ignore) {
+          }
+      }
+      return list;
+  }
+
+}

Modified: mina/trunk/mina-core/src/test/java/org/apache/mina/filter/logging/MdcInjectionFilterTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/mina-core/src/test/java/org/apache/mina/filter/logging/MdcInjectionFilterTest.java?rev=996337&r1=996336&r2=996337&view=diff
==============================================================================
--- mina/trunk/mina-core/src/test/java/org/apache/mina/filter/logging/MdcInjectionFilterTest.java (original)
+++ mina/trunk/mina-core/src/test/java/org/apache/mina/filter/logging/MdcInjectionFilterTest.java Sun Sep 12 16:43:52 2010
@@ -95,15 +95,13 @@ public class MdcInjectionFilterTest {
 
     @After
     public void tearDown() throws Exception {
-        acceptor.dispose();
+        acceptor.dispose(true);
         org.apache.log4j.Logger.getRootLogger().setLevel(previousLevelRootLogger);
 
         destroy(executorFilter1);
         destroy(executorFilter2);
 
         List<String> after = getThreadNames();
-        System.out.println("");
-        System.out.println("after = " + after);
 
         // give acceptor some time to shut down
         Thread.sleep(50);
@@ -111,16 +109,24 @@ public class MdcInjectionFilterTest {
 
         int count = 0;
 
-        // NOTE: this is *not* intended to be a permanenet fix for this test-case.
-        // There just is no API to block until the ExecutorService of AbstractIoService is terminated.
+        // NOTE: this is *not* intended to be a permanent fix for this test-case.
+        // There used to be no API to block until the ExecutorService of AbstractIoService is terminated.
+        // The API exists now : dispose(true) so we should get rid of this code.
 
         while (contains(after, "Nio") && count++ < 10) {
             Thread.sleep(50);
             after = getThreadNames();
-            System.out.println("after = " + after);
+            System.out.println("** after = " + after);
         }
         System.out.println("============================");
-        
+
+      while (contains(after, "pool") && count++ < 10) {
+          Thread.sleep(50);
+          after = getThreadNames();
+          System.out.println("** after = " + after);
+      }
+      System.out.println("============================");
+
         // The problem is that we clear the events of the appender here, but it's possible that a thread from
         // a previous test still generates events during the execution of the next test
         appender.clear();
@@ -131,7 +137,7 @@ public class MdcInjectionFilterTest {
             ExecutorService executor = (ExecutorService) executorFilter.getExecutor();
             executor.shutdown();
             while (!executor.isTerminated()) {
-                System.out.println("Waiting for termination of " + executorFilter);  
+                //System.out.println("Waiting for termination of " + executorFilter);  
                 executor.awaitTermination(10, TimeUnit.MILLISECONDS);
             }
         }
@@ -235,12 +241,15 @@ public class MdcInjectionFilterTest {
         simpleIoHandler.messageSentLatch.await();
         simpleIoHandler.sessionIdleLatch.await();
         simpleIoHandler.sessionClosedLatch.await();
-        connector.dispose();
+        connector.dispose(true);
 
         // make a copy to prevent ConcurrentModificationException
         List<LoggingEvent> events = new ArrayList<LoggingEvent>(appender.events);
         // verify that all logging events have correct MDC
         for (LoggingEvent event : events) {
+            if (event.getLoggerName().startsWith("org.apache.mina.core.service.AbstractIoService")) {
+              continue;
+            }
             for (MdcInjectionFilter.MdcKey mdcKey : MdcInjectionFilter.MdcKey.values()) {
               String key = mdcKey.name();
               Object value = event.getMDC(key);
@@ -271,7 +280,7 @@ public class MdcInjectionFilterTest {
         simpleIoHandler.messageSentLatch.await();
         simpleIoHandler.sessionIdleLatch.await();
         simpleIoHandler.sessionClosedLatch.await();
-        connector.dispose();
+        connector.dispose(true);
 
         // make a copy to prevent ConcurrentModificationException
         List<LoggingEvent> events = new ArrayList<LoggingEvent>(appender.events);