You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2016/08/18 03:46:54 UTC

mina git commit: o Added a counter to avoid creating new selector again and again. If the select() returns 0, we give it 10 other chances to get a correct return. o Fixed some Sonar violations

Repository: mina
Updated Branches:
  refs/heads/2.0 b1661ec24 -> 53fdc798e


o Added a counter to avoid creating new selector again and again. If the
select() returns 0, we give it 10 other chances to get a correct return.
o Fixed some Sonar violations

Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/53fdc798
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/53fdc798
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/53fdc798

Branch: refs/heads/2.0
Commit: 53fdc798eb8397fec4661f2222c6c901de8e0f11
Parents: b1661ec
Author: Emmanuel L�charny <el...@symas.com>
Authored: Thu Aug 18 05:42:25 2016 +0200
Committer: Emmanuel L�charny <el...@symas.com>
Committed: Thu Aug 18 05:42:25 2016 +0200

----------------------------------------------------------------------
 .../polling/AbstractPollingIoProcessor.java     | 52 ++++++++++++--------
 1 file changed, 32 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina/blob/53fdc798/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
index c0f7ba8..6097038 100644
--- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
+++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
@@ -67,7 +67,7 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> {
     /** A logger for this class */
-    private final static Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
+    private static final Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
 
     /**
      * A timeout used for the select, as we need to get out to deal with idle
@@ -76,7 +76,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
     private static final long SELECT_TIMEOUT = 1000L;
 
     /** A map containing the last Thread ID for each class */
-    private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<Class<?>, AtomicInteger>();
+    private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<>();
 
     /** This IoProcessor instance name */
     private final String threadName;
@@ -85,22 +85,22 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
     private final Executor executor;
 
     /** A Session queue containing the newly created sessions */
-    private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>();
+    private final Queue<S> newSessions = new ConcurrentLinkedQueue<>();
 
     /** A queue used to store the sessions to be removed */
-    private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>();
+    private final Queue<S> removingSessions = new ConcurrentLinkedQueue<>();
 
     /** A queue used to store the sessions to be flushed */
-    private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();
+    private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<>();
 
     /**
      * A queue used to store the sessions which have a trafficControl to be
      * updated
      */
-    private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>();
+    private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<>();
 
     /** The processor thread : it handles the incoming messages */
-    private final AtomicReference<Processor> processorRef = new AtomicReference<Processor>();
+    private final AtomicReference<Processor> processorRef = new AtomicReference<>();
 
     private long lastIdleCheckTime;
 
@@ -158,6 +158,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
     /**
      * {@inheritDoc}
      */
+    @Override
     public final boolean isDisposing() {
         return disposing;
     }
@@ -165,6 +166,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
     /**
      * {@inheritDoc}
      */
+    @Override
     public final boolean isDisposed() {
         return disposed;
     }
@@ -172,6 +174,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
     /**
      * {@inheritDoc}
      */
+    @Override
     public final void dispose() {
         if (disposed || disposing) {
             return;
@@ -252,7 +255,8 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
      * @return the state of the session
      */
     protected abstract SessionState getState(S session);
-
+    
+    
     /**
      * Tells if the session ready for writing
      * 
@@ -360,6 +364,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
     /**
      * {@inheritDoc}
      */
+    @Override
     public final void add(S session) {
         if (disposed || disposing) {
             throw new IllegalStateException("Already disposed.");
@@ -373,6 +378,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
     /**
      * {@inheritDoc}
      */
+    @Override
     public final void remove(S session) {
         scheduleRemove(session);
         startupProcessor();
@@ -387,6 +393,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
     /**
      * {@inheritDoc}
      */
+    @Override
     public void write(S session, WriteRequest writeRequest) {
         WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
 
@@ -400,6 +407,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
     /**
      * {@inheritDoc}
      */
+    @Override
     public final void flush(S session) {
         // add the session to the queue if it's not already
         // in the queue, then wake up the select()
@@ -454,7 +462,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
      * 
      * @throws IOException If we got an exception
      */
-    abstract protected void registerNewSelector() throws IOException;
+    protected abstract void registerNewSelector() throws IOException;
 
     /**
      * Check that the select() has not exited immediately just because of a
@@ -464,7 +472,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
      * @return <tt>true</tt> if a connection has been brutally closed.
      * @throws IOException If we got an exception
      */
-    abstract protected boolean isBrokenConnection() throws IOException;
+    protected abstract boolean isBrokenConnection() throws IOException;
 
     /**
      * Loops over the new sessions blocking queue and returns the number of
@@ -593,7 +601,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
         WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
         WriteRequest req;
 
-        List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
+        List<WriteRequest> failedRequests = new ArrayList<>();
 
         if ((req = writeRequestQueue.poll(session)) != null) {
             Object message = req.getMessage();
@@ -652,11 +660,9 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
         }
 
         // Process writes
-        if (isWritable(session) && !session.isWriteSuspended()) {
+        if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) {
             // add the session to the queue, if it's not already there
-            if (session.setScheduledForFlush(true)) {
-                flushingSessions.add(session);
-            }
+            flushingSessions.add(session);
         }
     }
 
@@ -707,7 +713,6 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
             }
 
             if (ret < 0) {
-                // scheduleRemove(session);
                 IoFilterChain filterChain = session.getFilterChain();
                 filterChain.fireInputClosed();
             }
@@ -828,7 +833,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
                     session.setCurrentWriteRequest(req);
                 }
 
-                int localWrittenBytes = 0;
+                int localWrittenBytes;
                 Object message = req.getMessage();
 
                 if (message instanceof IoBuffer) {
@@ -1025,6 +1030,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
     /**
      * {@inheritDoc}
      */
+    @Override
     public void updateTrafficControl(S session) {
         //
         try {
@@ -1054,6 +1060,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
 
             int nSessions = 0;
             lastIdleCheckTime = System.currentTimeMillis();
+            int nbTries = 10;
 
             for (;;) {
                 try {
@@ -1064,7 +1071,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
                     long t0 = System.currentTimeMillis();
                     int selected = select(SELECT_TIMEOUT);
                     long t1 = System.currentTimeMillis();
-                    long delta = (t1 - t0);
+                    long delta = t1 - t0;
 
                     if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) {
                         // Last chance : the select() may have been
@@ -1072,7 +1079,6 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
                         if (isBrokenConnection()) {
                             LOG.warn("Broken connection");
                         } else {
-                            LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
                             // Ok, we are hit by the nasty epoll
                             // spinning.
                             // Basically, there is a race condition
@@ -1086,7 +1092,13 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
                             // CPU.
                             // We have to destroy the selector, and
                             // register all the socket on a new one.
-                            registerNewSelector();
+                            if (nbTries == 0) {
+                                LOG.warn("Create a new selector. Selected is 0, delta = " + delta);
+                                registerNewSelector();
+                                nbTries = 10;
+                            } else {
+                                nbTries--;
+                            }
                         }
                     }