You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2008/03/22 19:06:32 UTC

svn commit: r640032 - in /mina/trunk/core/src/main/java/org/apache/mina/common: AbstractIoService.java AbstractPollingIoAcceptor.java

Author: trustin
Date: Sat Mar 22 11:06:32 2008
New Revision: 640032

URL: http://svn.apache.org/viewvc?rev=640032&view=rev
Log:
Fixed the same problem that calls wakeup() after disposal in AbstractPollingIoAcceptor

Modified:
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java?rev=640032&r1=640031&r2=640032&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java Sat Mar 22 11:06:32 2008
@@ -50,7 +50,7 @@
                 s.setLastReadTime(s.getActivationTime());
                 s.setLastWriteTime(s.getActivationTime());
                 s.lastThroughputCalculationTime = s.getActivationTime();
-                
+
                 // Start idleness notification.
                 idleStatusChecker.addService(s);
             }
@@ -63,7 +63,7 @@
             public void sessionCreated(IoSession session) {}
             public void sessionDestroyed(IoSession session) {}
     };
-    
+
     /**
      * Current filter chain builder.
      */
@@ -73,7 +73,7 @@
      * Current handler.
      */
     private IoHandler handler;
-    
+
     private IoSessionDataStructureFactory sessionDataStructureFactory =
         new DefaultIoSessionDataStructureFactory();
 
@@ -85,7 +85,12 @@
     private final Executor executor;
     private final String threadName;
     private final boolean createdExecutor;
-    private final Object disposalLock = new Object();
+
+    /**
+     * A lock object which must be acquired when related resources are
+     * destroyed.
+     */
+    protected final Object disposalLock = new Object();
     private volatile boolean disposing;
     private volatile boolean disposed;
     private IoFuture disposalFuture;
@@ -96,7 +101,7 @@
     private final AtomicLong writtenMessages = new AtomicLong();
     private long lastReadTime;
     private long lastWriteTime;
-    
+
     private final AtomicLong scheduledWriteBytes = new AtomicLong();
     private final AtomicLong scheduledWriteMessages = new AtomicLong();
 
@@ -130,11 +135,11 @@
     private long lastIdleTimeForBoth;
     private long lastIdleTimeForRead;
     private long lastIdleTimeForWrite;
-    
+
     /**
      * The default {@link IoSessionConfig} which will be used to configure new sessions.
      */
-    private IoSessionConfig sessionConfig;
+    private final IoSessionConfig sessionConfig;
 
     protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
         if (sessionConfig == null) {
@@ -151,11 +156,11 @@
         this.listeners = new IoServiceListenerSupport(this);
         this.listeners.add(serviceActivationListener);
         this.sessionConfig = sessionConfig;
-        
+
         // Make JVM load the exception monitor before some transports
         // change the thread context class loader.
         ExceptionMonitor.getInstance();
-        
+
         if (executor == null) {
             this.executor = Executors.newCachedThreadPool();
             this.createdExecutor = true;
@@ -163,9 +168,9 @@
             this.executor = executor;
             this.createdExecutor = false;
         }
-        
+
         this.threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
-        
+
         executeWorker(idleStatusChecker.getNotifyingTask(), "idleStatusChecker");
     }
 
@@ -200,15 +205,15 @@
     public final boolean isActive() {
         return listeners.isActive();
     }
-    
+
     public final boolean isDisposing() {
         return disposing;
     }
-    
+
     public final boolean isDisposed() {
         return disposed;
     }
-    
+
     public final void dispose() {
         if (disposed) {
             return;
@@ -230,7 +235,7 @@
                 }
             }
         }
-        
+
         idleStatusChecker.getNotifyingTask().cancel();
         if (disposalFuture != null) {
             disposalFuture.awaitUninterruptibly();
@@ -249,7 +254,7 @@
 
         disposed = true;
     }
-    
+
     /**
      * Implement this method to release any acquired resources.  This method
      * is invoked only once by {@link #dispose()}.
@@ -343,11 +348,11 @@
 
         this.throughputCalculationInterval = throughputCalculationInterval;
     }
-    
+
     public final long getThroughputCalculationIntervalInMillis() {
         return throughputCalculationInterval * 1000L;
     }
-    
+
     public final double getReadBytesThroughput() {
         resetThroughput();
         return readBytesThroughput;
@@ -367,23 +372,23 @@
         resetThroughput();
         return writtenMessagesThroughput;
     }
-    
+
     public final double getLargestReadBytesThroughput() {
         return largestReadBytesThroughput;
     }
-    
+
     public final double getLargestWrittenBytesThroughput() {
         return largestWrittenBytesThroughput;
     }
-    
+
     public final double getLargestReadMessagesThroughput() {
         return largestReadMessagesThroughput;
     }
-    
+
     public final double getLargestWrittenMessagesThroughput() {
         return largestWrittenMessagesThroughput;
     }
-    
+
     private void resetThroughput() {
         if (getManagedSessionCount() == 0) {
             readBytesThroughput = 0;
@@ -400,17 +405,17 @@
             if (minInterval == 0 || interval < minInterval) {
                 return;
             }
-            
+
             long readBytes = this.readBytes.get();
             long writtenBytes = this.writtenBytes.get();
             long readMessages = this.readMessages.get();
             long writtenMessages = this.writtenMessages.get();
-            
+
             readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
             writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
             readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
             writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
-            
+
             if (readBytesThroughput > largestReadBytesThroughput) {
                 largestReadBytesThroughput = readBytesThroughput;
             }
@@ -423,16 +428,16 @@
             if (writtenMessagesThroughput > largestWrittenMessagesThroughput) {
                 largestWrittenMessagesThroughput = writtenMessagesThroughput;
             }
-           
+
             lastReadBytes = readBytes;
             lastWrittenBytes = writtenBytes;
             lastReadMessages = readMessages;
             lastWrittenMessages = writtenMessages;
-            
+
             lastThroughputCalculationTime = currentTime;
         }
     }
-    
+
     public final long getScheduledWriteBytes() {
         return scheduledWriteBytes.get();
     }
@@ -472,7 +477,7 @@
     public final long getLastWriteTime() {
         return lastWriteTime;
     }
-    
+
     protected final void setLastWriteTime(long lastWriteTime) {
         this.lastWriteTime = lastWriteTime;
     }
@@ -523,7 +528,7 @@
         if (idleTime < 0) {
             throw new IllegalArgumentException("Illegal idle time: " + idleTime);
         }
-        
+
         if (status == IdleStatus.BOTH_IDLE) {
             idleTimeForBoth = idleTime;
         } else if (status == IdleStatus.READER_IDLE) {
@@ -533,7 +538,7 @@
         } else {
             throw new IllegalArgumentException("Unknown idle status: " + status);
         }
-        
+
         if (idleTime == 0) {
             if (status == IdleStatus.BOTH_IDLE) {
                 idleCountForBoth = 0;
@@ -607,10 +612,10 @@
             throw new IllegalArgumentException("Unknown idle status: " + status);
         }
     }
-    
+
     protected final void notifyIdleness(long currentTime) {
         updateThroughput(currentTime);
-        
+
         synchronized (idlenessCheckLock) {
             notifyIdleness(
                     currentTime,
@@ -618,14 +623,14 @@
                     IdleStatus.BOTH_IDLE, Math.max(
                             getLastIoTime(),
                             getLastIdleTime(IdleStatus.BOTH_IDLE)));
-            
+
             notifyIdleness(
                     currentTime,
                     getIdleTimeInMillis(IdleStatus.READER_IDLE),
                     IdleStatus.READER_IDLE, Math.max(
                             getLastReadTime(),
                             getLastIdleTime(IdleStatus.READER_IDLE)));
-            
+
             notifyIdleness(
                     currentTime,
                     getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
@@ -634,7 +639,7 @@
                             getLastIdleTime(IdleStatus.WRITER_IDLE)));
         }
     }
-    
+
     private void notifyIdleness(
             long currentTime, long idleTime, IdleStatus status, long lastIoTime) {
         if (idleTime > 0 && lastIoTime != 0
@@ -667,7 +672,7 @@
     public final int getWriterIdleCount() {
         return getIdleCount(IdleStatus.WRITER_IDLE);
     }
-    
+
     public final int getBothIdleTime() {
         return getIdleTime(IdleStatus.BOTH_IDLE);
     }
@@ -717,7 +722,7 @@
     }
 
     public final Set<WriteFuture> broadcast(Object message) {
-        // Convert to Set.  We do not return a List here because only the 
+        // Convert to Set.  We do not return a List here because only the
         // direct caller of MessageBroadcaster knows the order of write
         // operations.
         final List<WriteFuture> futures = IoUtil.broadcast(
@@ -734,19 +739,19 @@
             }
         };
     }
-    
+
     protected final IoServiceListenerSupport getListeners() {
         return listeners;
     }
-    
+
     protected final IdleStatusChecker getIdleStatusChecker() {
         return idleStatusChecker;
     }
-    
+
     protected final void executeWorker(Runnable worker) {
         executeWorker(worker, null);
     }
-    
+
     protected final void executeWorker(Runnable worker, String suffix) {
         String actualThreadName = threadName;
         if (suffix != null) {
@@ -754,7 +759,7 @@
         }
         executor.execute(new NamePreservingRunnable(worker, actualThreadName));
     }
-    
+
     // TODO Figure out make it work without causing a compiler error / warning.
     @SuppressWarnings("unchecked")
     protected final void finishSessionInitialization(
@@ -795,14 +800,14 @@
             // DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now).
             session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future);
         }
-        
+
         if (sessionInitializer != null) {
             sessionInitializer.initializeSession(session, future);
         }
-        
+
         finishSessionInitialization0(session, future);
     }
-    
+
     /**
      * Implement this method to perform additional tasks required for session
      * initialization. Do not call this method directly;

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java?rev=640032&r1=640031&r2=640032&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java Sat Mar 22 11:06:32 2008
@@ -50,7 +50,7 @@
 
     private final Map<SocketAddress, H> boundHandles =
         Collections.synchronizedMap(new HashMap<SocketAddress, H>());
-    
+
     private final ServiceOperationFuture disposalFuture =
         new ServiceOperationFuture();
     private volatile boolean selectable;
@@ -66,7 +66,7 @@
     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass, int processorCount) {
         this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true);
     }
-    
+
     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<T> processor) {
         this(sessionConfig, null, processor, false);
     }
@@ -77,14 +77,14 @@
 
     private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor, boolean createdProcessor) {
         super(sessionConfig, executor);
-        
+
         if (processor == null) {
             throw new NullPointerException("processor");
         }
-        
+
         this.processor = processor;
         this.createdProcessor = createdProcessor;
-        
+
         try {
             init();
             selectable = true;
@@ -196,7 +196,7 @@
     private class Worker implements Runnable {
         public void run() {
             int nHandles = 0;
-            
+
             while (selectable) {
                 try {
                     // gets the number of keys that are ready to go
@@ -233,7 +233,7 @@
                     }
                 }
             }
-            
+
             if (selectable && isDisposing()) {
                 selectable = false;
                 try {
@@ -242,7 +242,11 @@
                     }
                 } finally {
                     try {
-                        destroy();
+                        synchronized (disposalLock) {
+                            if (isDisposing()) {
+                                destroy();
+                            }
+                        }
                     } catch (Exception e) {
                         ExceptionMonitor.getInstance().exceptionCaught(e);
                     } finally {
@@ -271,7 +275,7 @@
                 if (session == null) {
                     break;
                 }
-                
+
                 finishSessionInitialization(session, null, null);
 
                 // add the session to the SocketIoProcessor
@@ -295,16 +299,16 @@
             if (future == null) {
                 break;
             }
-            
+
             Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
             List<SocketAddress> localAddresses = future.getLocalAddresses();
-            
+
             try {
                 for (SocketAddress a: localAddresses) {
                     H handle = open(a);
                     newHandles.put(localAddress(handle), handle);
                 }
-                
+
                 boundHandles.putAll(newHandles);
 
                 // and notify.
@@ -326,7 +330,7 @@
                 }
             }
         }
-        
+
         return 0;
     }
 
@@ -343,7 +347,7 @@
             if (future == null) {
                 break;
             }
-            
+
             // close the channels
             for (SocketAddress a: future.getLocalAddresses()) {
                 H handle = boundHandles.remove(a);
@@ -360,10 +364,10 @@
                     cancelledHandles ++;
                 }
             }
-            
+
             future.setDone();
         }
-        
+
         return cancelledHandles;
     }