You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2016/08/18 03:46:54 UTC
mina git commit: o Added a counter to avoid creating new selector
again and again. If the select() returns 0,
we give it 10 other chances to get a correct return. o Fixed some Sonar
violations
Repository: mina
Updated Branches:
refs/heads/2.0 b1661ec24 -> 53fdc798e
o Added a counter to avoid creating new selector again and again. If the
select() returns 0, we give it 10 other chances to get a correct return.
o Fixed some Sonar violations
Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/53fdc798
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/53fdc798
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/53fdc798
Branch: refs/heads/2.0
Commit: 53fdc798eb8397fec4661f2222c6c901de8e0f11
Parents: b1661ec
Author: Emmanuel L�charny <el...@symas.com>
Authored: Thu Aug 18 05:42:25 2016 +0200
Committer: Emmanuel L�charny <el...@symas.com>
Committed: Thu Aug 18 05:42:25 2016 +0200
----------------------------------------------------------------------
.../polling/AbstractPollingIoProcessor.java | 52 ++++++++++++--------
1 file changed, 32 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina/blob/53fdc798/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
index c0f7ba8..6097038 100644
--- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
+++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
@@ -67,7 +67,7 @@ import org.slf4j.LoggerFactory;
*/
public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> {
/** A logger for this class */
- private final static Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
+ private static final Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
/**
* A timeout used for the select, as we need to get out to deal with idle
@@ -76,7 +76,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
private static final long SELECT_TIMEOUT = 1000L;
/** A map containing the last Thread ID for each class */
- private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<Class<?>, AtomicInteger>();
+ private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<>();
/** This IoProcessor instance name */
private final String threadName;
@@ -85,22 +85,22 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
private final Executor executor;
/** A Session queue containing the newly created sessions */
- private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>();
+ private final Queue<S> newSessions = new ConcurrentLinkedQueue<>();
/** A queue used to store the sessions to be removed */
- private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>();
+ private final Queue<S> removingSessions = new ConcurrentLinkedQueue<>();
/** A queue used to store the sessions to be flushed */
- private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();
+ private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<>();
/**
* A queue used to store the sessions which have a trafficControl to be
* updated
*/
- private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>();
+ private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<>();
/** The processor thread : it handles the incoming messages */
- private final AtomicReference<Processor> processorRef = new AtomicReference<Processor>();
+ private final AtomicReference<Processor> processorRef = new AtomicReference<>();
private long lastIdleCheckTime;
@@ -158,6 +158,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
/**
* {@inheritDoc}
*/
+ @Override
public final boolean isDisposing() {
return disposing;
}
@@ -165,6 +166,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
/**
* {@inheritDoc}
*/
+ @Override
public final boolean isDisposed() {
return disposed;
}
@@ -172,6 +174,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
/**
* {@inheritDoc}
*/
+ @Override
public final void dispose() {
if (disposed || disposing) {
return;
@@ -252,7 +255,8 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
* @return the state of the session
*/
protected abstract SessionState getState(S session);
-
+
+
/**
* Tells if the session ready for writing
*
@@ -360,6 +364,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
/**
* {@inheritDoc}
*/
+ @Override
public final void add(S session) {
if (disposed || disposing) {
throw new IllegalStateException("Already disposed.");
@@ -373,6 +378,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
/**
* {@inheritDoc}
*/
+ @Override
public final void remove(S session) {
scheduleRemove(session);
startupProcessor();
@@ -387,6 +393,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
/**
* {@inheritDoc}
*/
+ @Override
public void write(S session, WriteRequest writeRequest) {
WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
@@ -400,6 +407,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
/**
* {@inheritDoc}
*/
+ @Override
public final void flush(S session) {
// add the session to the queue if it's not already
// in the queue, then wake up the select()
@@ -454,7 +462,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
*
* @throws IOException If we got an exception
*/
- abstract protected void registerNewSelector() throws IOException;
+ protected abstract void registerNewSelector() throws IOException;
/**
* Check that the select() has not exited immediately just because of a
@@ -464,7 +472,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
* @return <tt>true</tt> if a connection has been brutally closed.
* @throws IOException If we got an exception
*/
- abstract protected boolean isBrokenConnection() throws IOException;
+ protected abstract boolean isBrokenConnection() throws IOException;
/**
* Loops over the new sessions blocking queue and returns the number of
@@ -593,7 +601,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
WriteRequest req;
- List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
+ List<WriteRequest> failedRequests = new ArrayList<>();
if ((req = writeRequestQueue.poll(session)) != null) {
Object message = req.getMessage();
@@ -652,11 +660,9 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
}
// Process writes
- if (isWritable(session) && !session.isWriteSuspended()) {
+ if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) {
// add the session to the queue, if it's not already there
- if (session.setScheduledForFlush(true)) {
- flushingSessions.add(session);
- }
+ flushingSessions.add(session);
}
}
@@ -707,7 +713,6 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
}
if (ret < 0) {
- // scheduleRemove(session);
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireInputClosed();
}
@@ -828,7 +833,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
session.setCurrentWriteRequest(req);
}
- int localWrittenBytes = 0;
+ int localWrittenBytes;
Object message = req.getMessage();
if (message instanceof IoBuffer) {
@@ -1025,6 +1030,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
/**
* {@inheritDoc}
*/
+ @Override
public void updateTrafficControl(S session) {
//
try {
@@ -1054,6 +1060,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
int nSessions = 0;
lastIdleCheckTime = System.currentTimeMillis();
+ int nbTries = 10;
for (;;) {
try {
@@ -1064,7 +1071,7 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
long t0 = System.currentTimeMillis();
int selected = select(SELECT_TIMEOUT);
long t1 = System.currentTimeMillis();
- long delta = (t1 - t0);
+ long delta = t1 - t0;
if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) {
// Last chance : the select() may have been
@@ -1072,7 +1079,6 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
if (isBrokenConnection()) {
LOG.warn("Broken connection");
} else {
- LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
// Ok, we are hit by the nasty epoll
// spinning.
// Basically, there is a race condition
@@ -1086,7 +1092,13 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
// CPU.
// We have to destroy the selector, and
// register all the socket on a new one.
- registerNewSelector();
+ if (nbTries == 0) {
+ LOG.warn("Create a new selector. Selected is 0, delta = " + delta);
+ registerNewSelector();
+ nbTries = 10;
+ } else {
+ nbTries--;
+ }
}
}