You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2008/05/13 04:56:28 UTC

svn commit: r655717 - in /mina: sandbox/filter-traffic/src/test/java/org/apache/mina/filter/traffic/ sandbox/native/ trunk/core/src/main/java/org/apache/mina/common/ trunk/core/src/test/java/org/apache/mina/common/ trunk/core/src/test/java/org/apache/m...

Author: trustin
Date: Mon May 12 19:56:27 2008
New Revision: 655717

URL: http://svn.apache.org/viewvc?rev=655717&view=rev
Log:
Resolved DIMINA-582 (IoService.getManagedSession() should return a Map instead of a Set.)
* Replaced ConcurrentHashSet<IoSession> in IoServiceListenerSupport with ConcurrentHashMap<Long, IoSession>
* Fixed related compilation errors in various modules and abstract classes


Modified:
    mina/sandbox/filter-traffic/src/test/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
    mina/sandbox/native/pom.xml
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoService.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java
    mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java
    mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractBindTest.java
    mina/trunk/integration-jmx/src/main/java/org/apache/mina/integration/jmx/IoServiceMBean.java

Modified: mina/sandbox/filter-traffic/src/test/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
URL: http://svn.apache.org/viewvc/mina/sandbox/filter-traffic/src/test/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java?rev=655717&r1=655716&r2=655717&view=diff
==============================================================================
--- mina/sandbox/filter-traffic/src/test/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java (original)
+++ mina/sandbox/filter-traffic/src/test/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java Mon May 12 19:56:27 2008
@@ -64,11 +64,11 @@
  * @version $Rev$, $Date$
  */
 public class ReadThrottleFilter extends IoFilterAdapter {
-    
+
     private static final AtomicInteger globalBufferSize = new AtomicInteger();
     private static final Map<IoService, AtomicInteger> serviceBufferSizes =
         new CopyOnWriteMap<IoService, AtomicInteger>();
-    
+
     private static final Object globalResumeLock = new Object();
     private static long lastGlobalResumeTime = 0;
     private final Logger logger = LoggerFactory.getLogger(getClass());
@@ -81,7 +81,7 @@
     public static int getGlobalBufferSize() {
         return globalBufferSize.get();
     }
-    
+
     public static int getServiceBufferSize(IoService service) {
         AtomicInteger answer = serviceBufferSizes.get(service);
         if (answer == null) {
@@ -90,7 +90,7 @@
             return answer.get();
         }
     }
-    
+
     private static int increaseServiceBufferSize(IoService service, int increment) {
         AtomicInteger serviceBufferSize = serviceBufferSizes.get(service);
         if (serviceBufferSize == null) {
@@ -105,19 +105,19 @@
         }
         return serviceBufferSize.addAndGet(increment);
     }
-    
+
     private final AttributeKey STATE =
         new AttributeKey(ReadThrottleFilter.class, "state");
 
     private volatile ReadThrottlePolicy policy;
     private final MessageSizeEstimator messageSizeEstimator;
-    
+
     private volatile int maxSessionBufferSize;
     private volatile int maxServiceBufferSize;
     private volatile int maxGlobalBufferSize;
-    
+
     private final IoFilter enterFilter = new EnterFilter();
-    
+
     private final ScheduledExecutorService executor;
     private ScheduledFuture<?> resumeOthersFuture;
     private final AtomicInteger sessionCount = new AtomicInteger();
@@ -134,19 +134,19 @@
     public ReadThrottleFilter(ScheduledExecutorService executor) {
         this(executor, ReadThrottlePolicy.LOG);
     }
-    
+
     public ReadThrottleFilter(
             ScheduledExecutorService executor, ReadThrottlePolicy policy) {
         this(executor, policy, null);
     }
-    
+
     public ReadThrottleFilter(
             ScheduledExecutorService executor,
             ReadThrottlePolicy policy, MessageSizeEstimator messageSizeEstimator) {
         // 64KB, 64MB, 128MB.
         this(executor, policy, messageSizeEstimator, 65536, 1048576 * 64, 1048576 * 128);
     }
-    
+
     /**
      * Creates a new instance with the specified <tt>maxSessionBufferSize</tt>,
      * <tt>maxGlobalBufferSize</tt> and a new {@link DefaultMessageSizeEstimator}.
@@ -166,7 +166,7 @@
     /**
      * Creates a new instance with the specified <tt>maxSessionBufferSize</tt>,
      * <tt>maxGlobalBufferSize</tt> and {@link MessageSizeEstimator}.
-     * 
+     *
      * @param maxSessionBufferSize the maximum amount of data in the buffer of
      *                           the {@link ExecutorFilter} per {@link IoSession}.
      *                           Specify {@code 0} or a smaller value to disable.
@@ -179,7 +179,7 @@
      *                             a new {@link DefaultMessageSizeEstimator} is created.
      */
     public ReadThrottleFilter(
-            ScheduledExecutorService executor, 
+            ScheduledExecutorService executor,
             ReadThrottlePolicy policy, MessageSizeEstimator messageSizeEstimator,
             int maxSessionBufferSize, int maxServiceBufferSize, int maxGlobalBufferSize) {
         if (messageSizeEstimator == null) {
@@ -201,7 +201,7 @@
         if (policy == null) {
             throw new NullPointerException("policy");
         }
-        
+
         this.policy = policy;
     }
 
@@ -212,11 +212,11 @@
     public int getMaxSessionBufferSize() {
         return maxSessionBufferSize;
     }
-    
+
     public int getMaxServiceBufferSize() {
         return maxServiceBufferSize;
     }
-    
+
     /**
      * Returns the maximum amount of data in the buffer of the {@link ExecutorFilter}
      * for all {@link IoSession} whose {@link IoFilterChain} has been configured by
@@ -225,7 +225,7 @@
     public int getMaxGlobalBufferSize() {
         return maxGlobalBufferSize;
     }
-    
+
     /**
      * Sets the maximum amount of data in the buffer of the {@link ExecutorFilter}
      * per {@link IoSession}.  Specify {@code 0} or a smaller value to disable.
@@ -255,14 +255,14 @@
         }
         this.maxGlobalBufferSize = maxGlobalBufferSize;
     }
-    
+
     /**
      * Returns the size estimator currently in use.
      */
     public MessageSizeEstimator getMessageSizeEstimator() {
         return messageSizeEstimator;
     }
-    
+
     /**
      * Returns the current amount of data in the buffer of the {@link ExecutorFilter}
      * for the specified {@link IoSession}.
@@ -272,12 +272,12 @@
         if (state == null) {
             return 0;
         }
-        
+
         synchronized (state) {
             return state.sessionBufferSize;
         }
     }
-    
+
     @Override
     public void onPreAdd(
             IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
@@ -290,11 +290,11 @@
                     "You can't add the same filter instance more than once.  Create another instance and add it.");
         }
     }
-    
+
     @Override
     public void onPostAdd(
             IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
-        
+
         // My previous filter must be an ExecutorFilter.
         IoFilter lastFilter = null;
         for (IoFilterChain.Entry e: parent.getAll()) {
@@ -309,13 +309,13 @@
                             "an " + ExecutorFilter.class.getName() + " in the chain");
                 }
             }
-            
+
             lastFilter = currentFilter;
         }
-        
+
         // Add an entering filter before the ExecutorFilter.
         parent.getEntry(lastFilter).addBefore(name + ".preprocessor", enterFilter);
-        
+
         int previousSessionCount = sessionCount.getAndIncrement();
         if (previousSessionCount == 0) {
             synchronized (resumeOthersTask) {
@@ -334,7 +334,7 @@
         } catch (Exception e) {
             // Ignore.
         }
-        
+
         int currentSessionCount = sessionCount.decrementAndGet();
         if (currentSessionCount == 0) {
             synchronized (resumeOthersTask) {
@@ -354,20 +354,20 @@
     @Override
     public void filterSetTrafficMask(NextFilter nextFilter, IoSession session,
             TrafficMask trafficMask) throws Exception {
-        
+
         if (trafficMask.isReadable()) {
             State state = getState(session);
             boolean suspendedRead;
             synchronized (state) {
                 suspendedRead = state.suspendedRead;
             }
-            
+
             // Suppress resumeRead() if read is suspended by this filter.
             if (suspendedRead) {
                 trafficMask = trafficMask.and(TrafficMask.WRITE);
             }
         }
-        
+
         nextFilter.filterSetTrafficMask(session, trafficMask);
     }
 
@@ -382,13 +382,13 @@
                 // Ignore.
             }
         }
-        
+
         @Override
         public void onPostRemove(
                 IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
             parent.getSession().removeAttribute(STATE);
         }
-    
+
         @Override
         public void messageReceived(
                 NextFilter nextFilter, IoSession session, Object message) throws Exception {
@@ -396,7 +396,7 @@
             nextFilter.messageReceived(session, message);
         }
     }
-    
+
     private int estimateSize(Object message) {
         int size = messageSizeEstimator.estimateSize(message);
         if (size < 0) {
@@ -416,16 +416,16 @@
         int maxGlobalBufferSize = this.maxGlobalBufferSize;
         int maxServiceBufferSize = this.maxServiceBufferSize;
         int maxSessionBufferSize = this.maxSessionBufferSize;
-        
+
         ReadThrottlePolicy policy = getPolicy();
-        
+
         boolean enforcePolicy = false;
         int sessionBufferSize;
         synchronized (state) {
-            sessionBufferSize = (state.sessionBufferSize += size);
-            if ((maxSessionBufferSize != 0 && sessionBufferSize >= maxSessionBufferSize) ||
-                (maxServiceBufferSize != 0 && serviceBufferSize >= maxServiceBufferSize) ||
-                (maxGlobalBufferSize  != 0 && globalBufferSize  >= maxGlobalBufferSize)) {
+            sessionBufferSize = state.sessionBufferSize += size;
+            if (maxSessionBufferSize != 0 && sessionBufferSize >= maxSessionBufferSize ||
+                maxServiceBufferSize != 0 && serviceBufferSize >= maxServiceBufferSize ||
+                maxGlobalBufferSize  != 0 && globalBufferSize  >= maxGlobalBufferSize) {
                 enforcePolicy = true;
                 switch (policy) {
                 case EXCEPTION:
@@ -438,7 +438,7 @@
         if (logger.isDebugEnabled()) {
             logger.debug(getMessage(session, "  Entered - "));
         }
-        
+
         if (enforcePolicy) {
             switch (policy) {
             case CLOSE:
@@ -467,7 +467,7 @@
             logger.debug(getMessage(session, "Suspended - "));
         }
     }
-    
+
     private void exit(IoSession session, int size) {
         State state = getState(session);
 
@@ -475,7 +475,7 @@
         if (globalBufferSize < 0) {
             throw new IllegalStateException("globalBufferSize: " + globalBufferSize);
         }
-        
+
         int serviceBufferSize = increaseServiceBufferSize(session.getService(), -size);
         if (serviceBufferSize < 0) {
             throw new IllegalStateException("serviceBufferSize: " + serviceBufferSize);
@@ -484,12 +484,12 @@
         int maxGlobalBufferSize = this.maxGlobalBufferSize;
         int maxServiceBufferSize = this.maxServiceBufferSize;
         int maxSessionBufferSize = this.maxSessionBufferSize;
-        
+
         int sessionBufferSize;
-        
+
         boolean enforcePolicy = false;
         synchronized (state) {
-            sessionBufferSize = (state.sessionBufferSize -= size);
+            sessionBufferSize = state.sessionBufferSize -= size;
             if (sessionBufferSize < 0) {
                 throw new IllegalStateException("sessionBufferSize: " + sessionBufferSize);
             }
@@ -500,24 +500,24 @@
                 enforcePolicy = true;
             }
         }
-        
+
         if (logger.isDebugEnabled()) {
             logger.debug(getMessage(session, "   Exited - "));
         }
-        
+
         if (enforcePolicy) {
             session.resumeRead();
             if (logger.isDebugEnabled()) {
                 logger.debug(getMessage(session, "  Resumed - "));
             }
         }
-        
+
         resumeOthers();
     }
-    
+
     private void resumeOthers() {
         long currentTime = System.currentTimeMillis();
-        
+
         // Try to resume other sessions every other second.
         boolean resumeOthers;
         synchronized (globalResumeLock) {
@@ -528,21 +528,21 @@
                 resumeOthers = false;
             }
         }
-        
+
         if (resumeOthers) {
             int maxGlobalBufferSize = this.maxGlobalBufferSize;
             if (maxGlobalBufferSize == 0 || globalBufferSize.get() < maxGlobalBufferSize) {
                 List<IoService> inactiveServices = null;
                 for (IoService service: serviceBufferSizes.keySet()) {
                     resumeService(service);
-                    
+
                     if (!service.isActive()) {
                         if (inactiveServices == null) {
                             inactiveServices = new ArrayList<IoService>();
                         }
                         inactiveServices.add(service);
                     }
-                    
+
                     // Remove inactive services from the map.
                     if (inactiveServices != null) {
                         for (IoService s: inactiveServices) {
@@ -557,26 +557,26 @@
             }
         }
     }
-    
+
     private void resumeService(IoService service) {
         int maxServiceBufferSize = this.maxServiceBufferSize;
         if (maxServiceBufferSize == 0 || getServiceBufferSize(service) < maxServiceBufferSize) {
-            for (IoSession session: service.getManagedSessions()) {
+            for (IoSession session: service.getManagedSessions().values()) {
                 resume(session);
             }
         }
     }
-    
+
     private void resume(IoSession session) {
         State state = (State) session.getAttribute(STATE);
         if (state == null) {
             return;
         }
-        
+
         int maxSessionBufferSize = this.maxSessionBufferSize;
         boolean resume = false;
         synchronized (state) {
-            if ((maxSessionBufferSize == 0 || state.sessionBufferSize < maxSessionBufferSize)) {
+            if (maxSessionBufferSize == 0 || state.sessionBufferSize < maxSessionBufferSize) {
                 state.suspendedRead = false;
                 resume = true;
             }
@@ -592,7 +592,7 @@
 
     private void log(IoSession session, State state) {
         long currentTime = System.currentTimeMillis();
-        
+
         // Prevent log flood by logging every 3 seconds.
         boolean log;
         synchronized (state.logLock) {
@@ -603,20 +603,20 @@
                 log = false;
             }
         }
-        
+
         if (log) {
             logger.warn(getMessage(session));
         }
     }
-    
+
     private void raiseException(IoSession session) {
         throw new ReadFloodException(getMessage(session));
     }
-    
+
     private String getMessage(IoSession session) {
         return getMessage(session, "Read buffer flooded - ");
     }
-    
+
     private String getMessage(IoSession session, String prefix) {
         int  sessionLimit = maxSessionBufferSize;
         int  serviceLimit = maxServiceBufferSize;
@@ -634,7 +634,7 @@
             buf.append(getSessionBufferSize(session));
             buf.append(" / unlimited bytes, ");
         }
-        
+
         buf.append("service: ");
         if (serviceLimit != 0) {
             buf.append(getServiceBufferSize(session.getService()));
@@ -645,7 +645,7 @@
             buf.append(getServiceBufferSize(session.getService()));
             buf.append(" / unlimited bytes, ");
         }
-        
+
         buf.append("global: ");
         if (globalLimit != 0) {
             buf.append(getGlobalBufferSize());
@@ -656,10 +656,10 @@
             buf.append(getGlobalBufferSize());
             buf.append(" / unlimited bytes.");
         }
-        
+
         return buf.toString();
     }
-    
+
     private State getState(IoSession session) {
         State state = (State) session.getAttribute(STATE);
         if (state == null) {
@@ -671,7 +671,7 @@
         }
         return state;
     }
-    
+
     @Override
     public String toString() {
         return String.valueOf(getGlobalBufferSize()) + '/' + getMaxGlobalBufferSize();

Modified: mina/sandbox/native/pom.xml
URL: http://svn.apache.org/viewvc/mina/sandbox/native/pom.xml?rev=655717&r1=655716&r2=655717&view=diff
==============================================================================
--- mina/sandbox/native/pom.xml (original)
+++ mina/sandbox/native/pom.xml Mon May 12 19:56:27 2008
@@ -17,6 +17,10 @@
       <groupId>${groupId}</groupId>
       <artifactId>mina-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>tomcat</groupId>
+      <artifactId>tomcat-apr</artifactId>
+    </dependency>
   </dependencies>
 </project>
 

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java?rev=655717&r1=655716&r2=655717&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java Mon May 12 19:56:27 2008
@@ -22,6 +22,7 @@
 import java.util.AbstractSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
@@ -261,7 +262,7 @@
      */
     protected abstract IoFuture dispose0() throws Exception;
 
-    public final Set<IoSession> getManagedSessions() {
+    public final Map<Long, IoSession> getManagedSessions() {
         return listeners.getManagedSessions();
     }
 
@@ -726,7 +727,7 @@
         // direct caller of MessageBroadcaster knows the order of write
         // operations.
         final List<WriteFuture> futures = IoUtil.broadcast(
-                message, getManagedSessions());
+                message, getManagedSessions().values());
         return new AbstractSet<WriteFuture>() {
             @Override
             public Iterator<WriteFuture> iterator() {

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java?rev=655717&r1=655716&r2=655717&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java Mon May 12 19:56:27 2008
@@ -44,9 +44,9 @@
 
     private final Object lock = new Object();
     private final IoProcessor<T> processor = new ConnectionlessAcceptorProcessor();
-    private final Queue<AcceptorOperationFuture> registerQueue = 
+    private final Queue<AcceptorOperationFuture> registerQueue =
         new ConcurrentLinkedQueue<AcceptorOperationFuture>();
-    private final Queue<AcceptorOperationFuture> cancelQueue = 
+    private final Queue<AcceptorOperationFuture> cancelQueue =
         new ConcurrentLinkedQueue<AcceptorOperationFuture>();
     private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
     private final Map<SocketAddress, H> boundHandles =
@@ -270,7 +270,7 @@
             cancelQueue.clear();
             flushingSessions.clear();
         }
-        
+
         synchronized (lock) {
             if (worker == null) {
                 worker = new Worker();
@@ -326,7 +326,7 @@
                     }
                 }
             }
-            
+
             if (selectable && isDisposing()) {
                 selectable = false;
                 try {
@@ -351,7 +351,7 @@
                 }
 
                 if (isWritable(h)) {
-                    for (IoSession session : getManagedSessions()) {
+                    for (IoSession session : getManagedSessions().values()) {
                         scheduleFlush((T) session);
                     }
                 }
@@ -404,7 +404,7 @@
     private boolean flush(T session, long currentTime) throws Exception {
         // Clear OP_WRITE
         setInterestedInWrite(session, false);
-        
+
         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
         final int maxWrittenBytes =
             session.getConfig().getMaxReadBufferSize() +
@@ -421,7 +421,7 @@
                     }
                     session.setCurrentWriteRequest(req);
                 }
-    
+
                 IoBuffer buf = (IoBuffer) req.getMessage();
                 if (buf.remaining() == 0) {
                     // Clear and fire event
@@ -430,12 +430,12 @@
                     session.getFilterChain().fireMessageSent(req);
                     continue;
                 }
-    
+
                 SocketAddress destination = req.getDestination();
                 if (destination == null) {
                     destination = session.getRemoteAddress();
                 }
-    
+
                 int localWrittenBytes = send(session, buf, destination);
                 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
                     // Kernel buffer is full or wrote too much
@@ -443,7 +443,7 @@
                     return false;
                 } else {
                     setInterestedInWrite(session, false);
-    
+
                     // Clear and fire event
                     session.setCurrentWriteRequest(null);
                     writtenBytes += localWrittenBytes;
@@ -473,7 +473,7 @@
                     newHandles.put(localAddress(handle), handle);
                 }
                 boundHandles.putAll(newHandles);
-                
+
                 getListeners().fireServiceActivated();
                 req.setDone();
                 return newHandles.size();
@@ -493,7 +493,7 @@
                 }
             }
         }
-        
+
         return 0;
     }
 
@@ -521,10 +521,10 @@
                     nHandles ++;
                 }
             }
-            
+
             request.setDone();
         }
-        
+
         return nHandles;
     }
 
@@ -533,7 +533,7 @@
         if (currentTime - lastIdleCheckTime >= 1000) {
             lastIdleCheckTime = currentTime;
             IdleStatusChecker.notifyIdleness(
-                    getListeners().getManagedSessions().iterator(),
+                    getListeners().getManagedSessions().values().iterator(),
                     currentTime);
         }
     }

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java?rev=655717&r1=655716&r2=655717&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java Mon May 12 19:56:27 2008
@@ -46,7 +46,7 @@
         sessions.add(session);
         session.getCloseFuture().addListener(sessionCloseListener);
     }
-    
+
     public void addService(AbstractIoService service) {
         services.add(service);
     }
@@ -54,20 +54,20 @@
     public void removeSession(AbstractIoSession session) {
         sessions.remove(session);
     }
-    
+
     public void removeService(AbstractIoService service) {
         services.remove(service);
     }
-    
+
     public NotifyingTask getNotifyingTask() {
         return notifyingTask;
     }
-    
+
     public interface NotifyingTask extends Runnable {
         /**
          * Cancels this task.  Once canceled, {@link #run()} method will always return immediately.
          * To start this task again after calling this method, you have to create a new instance of
-         * {@link IdleStatusChecker} again. 
+         * {@link IdleStatusChecker} again.
          */
         void cancel();
     }
@@ -75,7 +75,7 @@
     private class NotifyingTaskImpl implements NotifyingTask {
         private volatile boolean cancelled;
         private volatile Thread thread;
-        
+
         public void run() {
             thread = Thread.currentThread();
             try {
@@ -84,7 +84,7 @@
                     long currentTime = System.currentTimeMillis();
                     notifyServices(currentTime);
                     notifySessions(currentTime);
-                    
+
                     try {
                         Thread.sleep(1000);
                     } catch (InterruptedException e) {
@@ -95,7 +95,7 @@
                 thread = null;
             }
         }
-        
+
         public void cancel() {
             cancelled = true;
             Thread thread = this.thread;
@@ -124,7 +124,7 @@
             }
         }
     }
-    
+
     private class SessionCloseListener implements IoFutureListener<IoFuture> {
         public void operationComplete(IoFuture future) {
             removeSession((AbstractIoSession) future.getSession());
@@ -148,16 +148,16 @@
     public static void notifyIdleness(IoService service, long currentTime) {
         notifyIdleness(service, currentTime, true);
     }
-    
+
     private static void notifyIdleness(IoService service, long currentTime, boolean includeSessions) {
         if (!(service instanceof AbstractIoService)) {
             return;
         }
-        
+
         ((AbstractIoService) service).notifyIdleness(currentTime);
-        
+
         if (includeSessions) {
-            notifyIdleness(service.getManagedSessions().iterator(), currentTime);
+            notifyIdleness(service.getManagedSessions().values().iterator(), currentTime);
         }
     }
 
@@ -176,21 +176,21 @@
                     IdleStatus.BOTH_IDLE, Math.max(
                             s.getLastIoTime(),
                             s.getLastIdleTime(IdleStatus.BOTH_IDLE)));
-            
+
             notifyIdleSession1(
                     s, currentTime,
                     s.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
                     IdleStatus.READER_IDLE, Math.max(
                             s.getLastReadTime(),
                             s.getLastIdleTime(IdleStatus.READER_IDLE)));
-            
+
             notifyIdleSession1(
                     s, currentTime,
                     s.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
                     IdleStatus.WRITER_IDLE, Math.max(
                             s.getLastWriteTime(),
                             s.getLastIdleTime(IdleStatus.WRITER_IDLE)));
-    
+
             notifyWriteTimeout(s, currentTime);
             updateThroughput(s, currentTime);
         } else {
@@ -200,14 +200,14 @@
                     IdleStatus.BOTH_IDLE, Math.max(
                             session.getLastIoTime(),
                             session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
-            
+
             notifyIdleSession0(
                     session, currentTime,
                     session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
                     IdleStatus.READER_IDLE, Math.max(
                             session.getLastReadTime(),
                             session.getLastIdleTime(IdleStatus.READER_IDLE)));
-            
+
             notifyIdleSession0(
                     session, currentTime,
                     session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoService.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoService.java?rev=655717&r1=655716&r2=655717&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoService.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoService.java Mon May 12 19:56:27 2008
@@ -20,6 +20,7 @@
 package org.apache.mina.common;
 
 import java.util.Collection;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -78,11 +79,13 @@
     void setHandler(IoHandler handler);
 
     /**
-     * Returns all sessions which are currently managed by this service.
+     * Returns the map of all sessions which are currently managed by this
+     * service.  The key of map is the {@link IoSession#getId() ID} of the
+     * session.
      *
      * @return the sessions. An empty collection if there's no session.
      */
-    Set<IoSession> getManagedSessions();
+    Map<Long, IoSession> getManagedSessions();
 
     /**
      * Returns the number of all sessions which are currently managed by this

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java?rev=655717&r1=655716&r2=655717&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java Mon May 12 19:56:27 2008
@@ -21,12 +21,12 @@
 
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.mina.util.ConcurrentHashSet;
-
 /**
  * A helper which provides addition and removal of {@link IoServiceListener}s and firing
  * events.
@@ -48,13 +48,13 @@
     /**
      * Tracks managed sessions.
      */
-    private final Set<IoSession> managedSessions = new ConcurrentHashSet<IoSession>();
+    private final ConcurrentMap<Long, IoSession> managedSessions = new ConcurrentHashMap<Long, IoSession>();
 
     /**
      * Read only version of {@link #managedSessions}.
      */
-    private final Set<IoSession> readOnlyManagedSessions = Collections.unmodifiableSet(managedSessions);
-    
+    private final Map<Long, IoSession> readOnlyManagedSessions = Collections.unmodifiableMap(managedSessions);
+
     private final AtomicBoolean activated = new AtomicBoolean();
     private volatile long activationTime;
     private volatile int largestManagedSessionCount;
@@ -83,23 +83,23 @@
     public void remove(IoServiceListener listener) {
         listeners.remove(listener);
     }
-    
+
     public long getActivationTime() {
         return activationTime;
     }
 
-    public Set<IoSession> getManagedSessions() {
+    public Map<Long, IoSession> getManagedSessions() {
         return readOnlyManagedSessions;
     }
-    
+
     public int getManagedSessionCount() {
         return managedSessions.size();
     }
-    
+
     public int getLargestManagedSessionCount() {
         return largestManagedSessionCount;
     }
-    
+
     public long getCumulativeManagedSessionCount() {
         return cumulativeManagedSessionCount;
     }
@@ -127,7 +127,7 @@
             }
         }
     }
-    
+
     /**
      * Calls {@link IoServiceListener#serviceIdle(IoService, IdleStatus)}
      * for all registered listeners.
@@ -181,10 +181,10 @@
         }
 
         // If already registered, ignore.
-        if (!managedSessions.add(session)) {
+        if (managedSessions.putIfAbsent(Long.valueOf(session.getId()), session) != null) {
             return;
         }
-        
+
         // If the first connector session, fire a virtual service activation event.
         if (firstSession) {
             fireServiceActivated();
@@ -215,7 +215,7 @@
      */
     public void fireSessionDestroyed(IoSession session) {
         // Try to remove the remaining empty session set after removal.
-        if (!managedSessions.remove(session)) {
+        if (managedSessions.remove(Long.valueOf(session.getId())) == null) {
             return;
         }
 
@@ -257,7 +257,7 @@
         Object lock = new Object();
         IoFutureListener<IoFuture> listener = new LockNotifyingListener(lock);
 
-        for (IoSession s : managedSessions) {
+        for (IoSession s : managedSessions.values()) {
             s.close().addListener(listener);
         }
 

Modified: mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java?rev=655717&r1=655716&r2=655717&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java Mon May 12 19:56:27 2008
@@ -106,7 +106,7 @@
         handlerControl.verify();
 
         Assert.assertEquals(1, support.getManagedSessions().size());
-        Assert.assertTrue(support.getManagedSessions().contains(session));
+        Assert.assertSame(session, support.getManagedSessions().get(session.getId()));
 
         // Test destruction & other side effects
         listenerControl.reset();
@@ -126,7 +126,7 @@
 
         Assert.assertTrue(session.isClosing());
         Assert.assertEquals(0, support.getManagedSessions().size());
-        Assert.assertFalse(support.getManagedSessions().contains(session));
+        Assert.assertNull(support.getManagedSessions().get(session.getId()));
     }
 
     public void testDisconnectOnUnbind() throws Exception {
@@ -207,7 +207,7 @@
 
         Assert.assertTrue(session.isClosing());
         Assert.assertEquals(0, support.getManagedSessions().size());
-        Assert.assertFalse(support.getManagedSessions().contains(session));
+        Assert.assertNull(support.getManagedSessions().get(session.getId()));
     }
 
     public void testConnectorActivation() throws Exception {
@@ -263,6 +263,6 @@
         handlerControl.verify();
 
         Assert.assertEquals(0, support.getManagedSessions().size());
-        Assert.assertFalse(support.getManagedSessions().contains(session));
+        Assert.assertNull(support.getManagedSessions().get(session.getId()));
     }
 }

Modified: mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractBindTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractBindTest.java?rev=655717&r1=655716&r2=655717&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractBindTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractBindTest.java Mon May 12 19:56:27 2008
@@ -172,7 +172,7 @@
         // Wait for the server side sessions to be created.
         Thread.sleep(500);
 
-        Collection<IoSession> managedSessions = acceptor.getManagedSessions();
+        Collection<IoSession> managedSessions = acceptor.getManagedSessions().values();
         Assert.assertEquals(5, managedSessions.size());
 
         acceptor.unbind();

Modified: mina/trunk/integration-jmx/src/main/java/org/apache/mina/integration/jmx/IoServiceMBean.java
URL: http://svn.apache.org/viewvc/mina/trunk/integration-jmx/src/main/java/org/apache/mina/integration/jmx/IoServiceMBean.java?rev=655717&r1=655716&r2=655717&view=diff
==============================================================================
--- mina/trunk/integration-jmx/src/main/java/org/apache/mina/integration/jmx/IoServiceMBean.java (original)
+++ mina/trunk/integration-jmx/src/main/java/org/apache/mina/integration/jmx/IoServiceMBean.java Mon May 12 19:56:27 2008
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,6 +21,7 @@
 import java.util.List;
 import java.util.Set;
 
+import javax.management.MBeanOperationInfo;
 import javax.management.MBeanParameterInfo;
 import javax.management.ObjectName;
 import javax.management.modelmbean.ModelMBeanOperationInfo;
@@ -33,7 +34,7 @@
 
 /**
  * A JMX MBean wrapper for an {@link IoSession}.
- * 
+ *
  * @author The Apache MINA Project (dev@mina.apache.org)
  * @version $Rev$, $Date$
  */
@@ -48,7 +49,7 @@
         id = "0x" + id;
         return id;
     }
-    
+
     public IoServiceMBean(IoService source) {
         super(source);
     }
@@ -57,35 +58,35 @@
     protected Object invoke0(String name, Object[] params, String[] signature) throws Exception {
         if (name.equals("findSessions")) {
             IoSessionFinder finder = new IoSessionFinder((String) params[0]);
-            return finder.find(getSource().getManagedSessions());
+            return finder.find(getSource().getManagedSessions().values());
         }
-        
+
         if (name.equals("findAndRegisterSessions")) {
             IoSessionFinder finder = new IoSessionFinder((String) params[0]);
             Set<IoSession> registeredSessions = new LinkedHashSet<IoSession>();
-            for (IoSession s: finder.find(getSource().getManagedSessions())) {
+            for (IoSession s: finder.find(getSource().getManagedSessions().values())) {
                 try {
                     getServer().registerMBean(
                             new IoSessionMBean(s),
                             new ObjectName(
-                                    getName().getDomain() + 
-                                    ":type=session,name=" + 
+                                    getName().getDomain() +
+                                    ":type=session,name=" +
                                     getSessionIdAsString(s.getId())));
                     registeredSessions.add(s);
                 } catch (Exception e) {
                     logger.warn("Failed to register a session as a MBean: " + s, e);
                 }
             }
-            
+
             return registeredSessions;
         }
-        
+
         if (name.equals("findAndProcessSessions")) {
             IoSessionFinder finder = new IoSessionFinder((String) params[0]);
             String command = (String) params[1];
             Object expr = Ognl.parseExpression(command);
-            Set<IoSession> matches = finder.find(getSource().getManagedSessions());
-            
+            Set<IoSession> matches = finder.find(getSource().getManagedSessions().values());
+
             for (IoSession s: matches) {
                 try {
                     Ognl.getValue(expr, s);
@@ -95,7 +96,7 @@
             }
             return matches;
         }
-        
+
         return super.invoke0(name, params, signature);
     }
 
@@ -106,13 +107,13 @@
                 new MBeanParameterInfo[] {
                         new MBeanParameterInfo(
                                 "ognlQuery", String.class.getName(), "a boolean OGNL expression")
-                }, Set.class.getName(), ModelMBeanOperationInfo.INFO));
+                }, Set.class.getName(), MBeanOperationInfo.INFO));
         operations.add(new ModelMBeanOperationInfo(
                 "findAndRegisterSessions", "findAndRegisterSessions",
                 new MBeanParameterInfo[] {
                         new MBeanParameterInfo(
                                 "ognlQuery", String.class.getName(), "a boolean OGNL expression")
-                }, Set.class.getName(), ModelMBeanOperationInfo.ACTION_INFO));
+                }, Set.class.getName(), MBeanOperationInfo.ACTION_INFO));
         operations.add(new ModelMBeanOperationInfo(
                 "findAndProcessSessions", "findAndProcessSessions",
                 new MBeanParameterInfo[] {
@@ -120,7 +121,7 @@
                                 "ognlQuery", String.class.getName(), "a boolean OGNL expression"),
                         new MBeanParameterInfo(
                                 "ognlCommand", String.class.getName(), "an OGNL expression that modifies the state of the sessions in the match result")
-                }, Set.class.getName(), ModelMBeanOperationInfo.ACTION_INFO));
+                }, Set.class.getName(), MBeanOperationInfo.ACTION_INFO));
     }
 
     @Override
@@ -130,13 +131,13 @@
                 "(newSession|broadcast|(add|remove)Listener)")) {
             return false;
         }
-        
+
         if ((methodName.equals("bind") || methodName.equals("unbind")) &&
                 (paramTypes.length > 1 ||
-                        (paramTypes.length == 1 && !SocketAddress.class.isAssignableFrom(paramTypes[0])))) {
+                        paramTypes.length == 1 && !SocketAddress.class.isAssignableFrom(paramTypes[0]))) {
             return false;
         }
-                
+
         return super.isOperation(methodName, paramTypes);
     }
 }