You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2016/01/28 16:53:57 UTC

mina git commit: Applied Radovan's patch. Should fix DIRMINA-1006

Repository: mina
Updated Branches:
  refs/heads/2.0 81b84d1f7 -> 2d6f82560


Applied Radovan's patch. Should fix DIRMINA-1006

Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/2d6f8256
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/2d6f8256
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/2d6f8256

Branch: refs/heads/2.0
Commit: 2d6f82560307399a4c6226686d3286ea8375066f
Parents: 81b84d1
Author: Emmanuel Lécharny <el...@symas.com>
Authored: Thu Jan 28 16:50:51 2016 +0100
Committer: Emmanuel Lécharny <el...@symas.com>
Committed: Thu Jan 28 16:50:51 2016 +0100

----------------------------------------------------------------------
 .../polling/AbstractPollingIoProcessor.java     | 90 +++++++++++---------
 1 file changed, 48 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina/blob/2d6f8256/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
index e524ec2..abd7045 100644
--- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
+++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
@@ -379,7 +379,9 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
     }
 
     private void scheduleRemove(S session) {
-        removingSessions.add(session);
+        if (!removingSessions.contains(session)) {
+            removingSessions.add(session);
+        }
     }
 
     /**
@@ -524,36 +526,38 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
     private int removeSessions() {
         int removedSessions = 0;
 
-        for (S session = removingSessions.poll(); session != null; session = removingSessions.poll()) {
+        for (S session = removingSessions.poll(); session != null;session = removingSessions.poll()) {
             SessionState state = getState(session);
 
             // Now deal with the removal accordingly to the session's state
             switch (state) {
-            case OPENED:
-                // Try to remove this session
-                if (removeNow(session)) {
-                    removedSessions++;
-                }
-
-                break;
-
-            case CLOSING:
-                // Skip if channel is already closed
-                break;
-
-            case OPENING:
-                // Remove session from the newSessions queue and
-                // remove it
-                newSessions.remove(session);
-
-                if (removeNow(session)) {
+                case OPENED:
+                    // Try to remove this session
+                    if (removeNow(session)) {
+                        removedSessions++;
+                    }
+                    
+                    break;
+    
+                case CLOSING:
+                    // Skip if channel is already closed
+                    // In any case, remove the session from the queue
                     removedSessions++;
-                }
-
-                break;
-
-            default:
-                throw new IllegalStateException(String.valueOf(state));
+                    break;
+    
+                case OPENING:
+                    // Remove session from the newSessions queue and
+                    // remove it
+                    newSessions.remove(session);
+    
+                    if (removeNow(session)) {
+                        removedSessions++;
+                    }
+    
+                    break;
+    
+                default:
+                    throw new IllegalStateException(String.valueOf(state));
             }
         }
 
@@ -570,9 +574,18 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
             IoFilterChain filterChain = session.getFilterChain();
             filterChain.fireExceptionCaught(e);
         } finally {
-            clearWriteRequestQueue(session);
-            ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
+            try {
+                clearWriteRequestQueue(session);
+                ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
+            } catch (Exception e) {
+                // The session was either destroyed or not at this point.
+                // We do not want any exception thrown from this "cleanup" code to change
+                // the return value by bubbling up.
+                IoFilterChain filterChain = session.getFilterChain();
+                filterChain.fireExceptionCaught(e);
+            }
         }
+        
         return false;
     }
 
@@ -1047,17 +1060,11 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
                     long t1 = System.currentTimeMillis();
                     long delta = (t1 - t0);
 
-                    if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {
+                    if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) {
                         // Last chance : the select() may have been
                         // interrupted because we have had an closed channel.
                         if (isBrokenConnection()) {
                             LOG.warn("Broken connection");
-
-                            // we can reselect immediately
-                            // set back the flag to false
-                            wakeupCalled.getAndSet(false);
-
-                            continue;
                         } else {
                             LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
                             // Ok, we are hit by the nasty epoll
@@ -1075,12 +1082,6 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
                             // register all the socket on a new one.
                             registerNewSelector();
                         }
-
-                        // Set back the flag to false
-                        wakeupCalled.getAndSet(false);
-
-                        // and continue the loop
-                        continue;
                     }
 
                     // Manage newly created session first
@@ -1131,11 +1132,16 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
                     // Disconnect all sessions immediately if disposal has been
                     // requested so that we exit this loop eventually.
                     if (isDisposing()) {
+                        boolean hasKeys = false;
+                        
                         for (Iterator<S> i = allSessions(); i.hasNext();) {
                             scheduleRemove(i.next());
+                            hasKeys = true;
                         }
 
-                        wakeup();
+                        if (hasKeys) {
+                            wakeup();
+                        }
                     }
                 } catch (ClosedSelectorException cse) {
                     // If the selector has been closed, we can exit the loop