You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2016/12/07 18:51:20 UTC
[1/4] mina git commit: o Added some missing Javadoc o Fixed some
Sonarlint warnings
Repository: mina
Updated Branches:
refs/heads/2.0 bf0254f34 -> 7c080890b
o Added some missing Javadoc
o Fixed some Sonarlint warnings
Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/9b26714b
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/9b26714b
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/9b26714b
Branch: refs/heads/2.0
Commit: 9b26714b1b59b7f507ddc3dfca941af323062e8d
Parents: bf0254f
Author: Emmanuel L�charny <el...@symas.com>
Authored: Wed Dec 7 11:18:53 2016 +0100
Committer: Emmanuel L�charny <el...@symas.com>
Committed: Wed Dec 7 11:18:53 2016 +0100
----------------------------------------------------------------------
.../core/polling/AbstractPollingIoAcceptor.java | 180 +--
.../polling/AbstractPollingIoConnector.java | 276 +++--
.../polling/AbstractPollingIoProcessor.java | 1124 +++++++++---------
3 files changed, 823 insertions(+), 757 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina/blob/9b26714b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
index 86cc31f..bf1bbf0 100644
--- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
+++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
@@ -65,6 +65,8 @@ import org.apache.mina.util.ExceptionMonitor;
* by the subclassing implementation.
*
* @see NioSocketAcceptor for a example of implementation
+ * @param <H> The type of IoHandler
+ * @param <S> The type of IoSession
*
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
@@ -435,8 +437,12 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H>
* The loop is stopped when all the bound handlers are unbound.
*/
private class Acceptor implements Runnable {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void run() {
- assert (acceptorRef.get() == this);
+ assert acceptorRef.get() == this;
int nHandles = 0;
@@ -466,16 +472,16 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H>
acceptorRef.set(null);
if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
- assert (acceptorRef.get() != this);
+ assert acceptorRef.get() != this;
break;
}
if (!acceptorRef.compareAndSet(null, this)) {
- assert (acceptorRef.get() != this);
+ assert acceptorRef.get() != this;
break;
}
- assert (acceptorRef.get() == this);
+ assert acceptorRef.get() == this;
}
if (selected > 0) {
@@ -553,106 +559,106 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H>
session.getProcessor().add(session);
}
}
- }
- /**
- * Sets up the socket communications. Sets items such as:
- * <p/>
- * Blocking
- * Reuse address
- * Receive buffer size
- * Bind to listen port
- * Registers OP_ACCEPT for selector
- */
- private int registerHandles() {
- for (;;) {
- // The register queue contains the list of services to manage
- // in this acceptor.
- AcceptorOperationFuture future = registerQueue.poll();
-
- if (future == null) {
- return 0;
- }
-
- // We create a temporary map to store the bound handles,
- // as we may have to remove them all if there is an exception
- // during the sockets opening.
- Map<SocketAddress, H> newHandles = new ConcurrentHashMap<>();
- List<SocketAddress> localAddresses = future.getLocalAddresses();
-
- try {
- // Process all the addresses
- for (SocketAddress a : localAddresses) {
- H handle = open(a);
- newHandles.put(localAddress(handle), handle);
+ /**
+ * Sets up the socket communications. Sets items such as:
+ * <p/>
+ * Blocking
+ * Reuse address
+ * Receive buffer size
+ * Bind to listen port
+ * Registers OP_ACCEPT for selector
+ */
+ private int registerHandles() {
+ for (;;) {
+ // The register queue contains the list of services to manage
+ // in this acceptor.
+ AcceptorOperationFuture future = registerQueue.poll();
+
+ if (future == null) {
+ return 0;
}
- // Everything went ok, we can now update the map storing
- // all the bound sockets.
- boundHandles.putAll(newHandles);
+ // We create a temporary map to store the bound handles,
+ // as we may have to remove them all if there is an exception
+ // during the sockets opening.
+ Map<SocketAddress, H> newHandles = new ConcurrentHashMap<>();
+ List<SocketAddress> localAddresses = future.getLocalAddresses();
- // and notify.
- future.setDone();
-
- return newHandles.size();
- } catch (Exception e) {
- // We store the exception in the future
- future.setException(e);
- } finally {
- // Roll back if failed to bind all addresses.
- if (future.getException() != null) {
- for (H handle : newHandles.values()) {
- try {
- close(handle);
- } catch (Exception e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
+ try {
+ // Process all the addresses
+ for (SocketAddress a : localAddresses) {
+ H handle = open(a);
+ newHandles.put(localAddress(handle), handle);
}
- // Wake up the selector to be sure we will process the newly bound handle
- // and not block forever in the select()
- wakeup();
+ // Everything went ok, we can now update the map storing
+ // all the bound sockets.
+ boundHandles.putAll(newHandles);
+
+ // and notify.
+ future.setDone();
+
+ return newHandles.size();
+ } catch (Exception e) {
+ // We store the exception in the future
+ future.setException(e);
+ } finally {
+ // Roll back if failed to bind all addresses.
+ if (future.getException() != null) {
+ for (H handle : newHandles.values()) {
+ try {
+ close(handle);
+ } catch (Exception e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+
+ // Wake up the selector to be sure we will process the newly bound handle
+ // and not block forever in the select()
+ wakeup();
+ }
}
}
}
- }
- /**
- * This method just checks to see if anything has been placed into the
- * cancellation queue. The only thing that should be in the cancelQueue
- * is CancellationRequest objects and the only place this happens is in
- * the doUnbind() method.
- */
- private int unregisterHandles() {
- int cancelledHandles = 0;
- for (;;) {
- AcceptorOperationFuture future = cancelQueue.poll();
- if (future == null) {
- break;
- }
+ /**
+ * This method just checks to see if anything has been placed into the
+ * cancellation queue. The only thing that should be in the cancelQueue
+ * is CancellationRequest objects and the only place this happens is in
+ * the doUnbind() method.
+ */
+ private int unregisterHandles() {
+ int cancelledHandles = 0;
+ for (;;) {
+ AcceptorOperationFuture future = cancelQueue.poll();
+ if (future == null) {
+ break;
+ }
- // close the channels
- for (SocketAddress a : future.getLocalAddresses()) {
- H handle = boundHandles.remove(a);
+ // close the channels
+ for (SocketAddress a : future.getLocalAddresses()) {
+ H handle = boundHandles.remove(a);
- if (handle == null) {
- continue;
- }
+ if (handle == null) {
+ continue;
+ }
- try {
- close(handle);
- wakeup(); // wake up again to trigger thread death
- } catch (Exception e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- } finally {
- cancelledHandles++;
+ try {
+ close(handle);
+ wakeup(); // wake up again to trigger thread death
+ } catch (Exception e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ } finally {
+ cancelledHandles++;
+ }
}
+
+ future.setDone();
}
- future.setDone();
+ return cancelledHandles;
}
-
- return cancelledHandles;
}
/**
http://git-wip-us.apache.org/repos/asf/mina/blob/9b26714b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoConnector.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoConnector.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoConnector.java
index ad68174..32a3956 100644
--- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoConnector.java
+++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoConnector.java
@@ -59,16 +59,18 @@ import org.apache.mina.util.ExceptionMonitor;
* provided by the subclassing implementation.
*
* @see NioSocketConnector for a example of implementation
+ * @param <H> The type of IoHandler
+ * @param <S> The type of IoSession
*
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
-public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H> extends AbstractIoConnector {
+public abstract class AbstractPollingIoConnector<S extends AbstractIoSession, H> extends AbstractIoConnector {
- private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
+ private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<>();
- private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
+ private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<>();
- private final IoProcessor<T> processor;
+ private final IoProcessor<S> processor;
private final boolean createdProcessor;
@@ -77,7 +79,7 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
private volatile boolean selectable;
/** The connector thread */
- private final AtomicReference<Connector> connectorRef = new AtomicReference<Connector>();
+ private final AtomicReference<Connector> connectorRef = new AtomicReference<>();
/**
* Constructor for {@link AbstractPollingIoConnector}. You need to provide a
@@ -93,8 +95,8 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
* a {@link Class}�of {@link IoProcessor} for the associated
* {@link IoSession} type.
*/
- protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) {
- this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
+ protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
+ this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true);
}
/**
@@ -113,9 +115,9 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
* @param processorCount
* the amount of processor to instantiate for the pool
*/
- protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass,
+ protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
int processorCount) {
- this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true);
+ this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount), true);
}
/**
@@ -133,7 +135,7 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
* {@link IoHandler} and processing the chains of
* {@link IoFilter}
*/
- protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, IoProcessor<T> processor) {
+ protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, IoProcessor<S> processor) {
this(sessionConfig, null, processor, false);
}
@@ -156,7 +158,7 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
* {@link IoHandler} and processing the chains of
* {@link IoFilter}
*/
- protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor) {
+ protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor) {
this(sessionConfig, executor, processor, false);
}
@@ -182,7 +184,7 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
* tagging the processor as automatically created, so it will be
* automatically disposed
*/
- private AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor,
+ private AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
boolean createdProcessor) {
super(sessionConfig, executor);
@@ -279,7 +281,7 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
* @throws Exception
* any exception thrown by the underlying systems calls
*/
- protected abstract T newSession(IoProcessor<T> processor, H handle) throws Exception;
+ protected abstract S newSession(IoProcessor<S> processor, H handle) throws Exception;
/**
* Close a client socket.
@@ -367,7 +369,7 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
handle = newHandle(localAddress);
if (connect(handle, remoteAddress)) {
ConnectFuture future = new DefaultConnectFuture();
- T session = newSession(processor, handle);
+ S session = newSession(processor, handle);
initSession(session, future, sessionInitializer);
// Forward the remaining process to the IoProcessor.
session.getProcessor().add(session);
@@ -413,116 +415,13 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
}
}
- private int registerNew() {
- int nHandles = 0;
- for (;;) {
- ConnectionRequest req = connectQueue.poll();
- if (req == null) {
- break;
- }
-
- H handle = req.handle;
- try {
- register(handle, req);
- nHandles++;
- } catch (Exception e) {
- req.setException(e);
- try {
- close(handle);
- } catch (Exception e2) {
- ExceptionMonitor.getInstance().exceptionCaught(e2);
- }
- }
- }
- return nHandles;
- }
-
- private int cancelKeys() {
- int nHandles = 0;
-
- for (;;) {
- ConnectionRequest req = cancelQueue.poll();
-
- if (req == null) {
- break;
- }
-
- H handle = req.handle;
-
- try {
- close(handle);
- } catch (Exception e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- } finally {
- nHandles++;
- }
- }
-
- if (nHandles > 0) {
- wakeup();
- }
-
- return nHandles;
- }
-
- /**
- * Process the incoming connections, creating a new session for each valid
- * connection.
- */
- private int processConnections(Iterator<H> handlers) {
- int nHandles = 0;
-
- // Loop on each connection request
- while (handlers.hasNext()) {
- H handle = handlers.next();
- handlers.remove();
-
- ConnectionRequest connectionRequest = getConnectionRequest(handle);
-
- if (connectionRequest == null) {
- continue;
- }
-
- boolean success = false;
- try {
- if (finishConnect(handle)) {
- T session = newSession(processor, handle);
- initSession(session, connectionRequest, connectionRequest.getSessionInitializer());
- // Forward the remaining process to the IoProcessor.
- session.getProcessor().add(session);
- nHandles++;
- }
- success = true;
- } catch (Exception e) {
- connectionRequest.setException(e);
- } finally {
- if (!success) {
- // The connection failed, we have to cancel it.
- cancelQueue.offer(connectionRequest);
- }
- }
- }
- return nHandles;
- }
-
- private void processTimedOutSessions(Iterator<H> handles) {
- long currentTime = System.currentTimeMillis();
-
- while (handles.hasNext()) {
- H handle = handles.next();
- ConnectionRequest connectionRequest = getConnectionRequest(handle);
-
- if ((connectionRequest != null) && (currentTime >= connectionRequest.deadline)) {
- connectionRequest.setException(new ConnectException("Connection timed out."));
- cancelQueue.offer(connectionRequest);
- }
- }
- }
-
private class Connector implements Runnable {
-
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void run() {
- assert (connectorRef.get() == this);
+ assert connectorRef.get() == this;
int nHandles = 0;
@@ -541,16 +440,16 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
connectorRef.set(null);
if (connectQueue.isEmpty()) {
- assert (connectorRef.get() != this);
+ assert connectorRef.get() != this;
break;
}
if (!connectorRef.compareAndSet(null, this)) {
- assert (connectorRef.get() != this);
+ assert connectorRef.get() != this;
break;
}
- assert (connectorRef.get() == this);
+ assert connectorRef.get() == this;
}
if (selected > 0) {
@@ -596,8 +495,117 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
}
}
}
+
+ private int registerNew() {
+ int nHandles = 0;
+ for (;;) {
+ ConnectionRequest req = connectQueue.poll();
+ if (req == null) {
+ break;
+ }
+
+ H handle = req.handle;
+ try {
+ register(handle, req);
+ nHandles++;
+ } catch (Exception e) {
+ req.setException(e);
+ try {
+ close(handle);
+ } catch (Exception e2) {
+ ExceptionMonitor.getInstance().exceptionCaught(e2);
+ }
+ }
+ }
+ return nHandles;
+ }
+
+ private int cancelKeys() {
+ int nHandles = 0;
+
+ for (;;) {
+ ConnectionRequest req = cancelQueue.poll();
+
+ if (req == null) {
+ break;
+ }
+
+ H handle = req.handle;
+
+ try {
+ close(handle);
+ } catch (Exception e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ } finally {
+ nHandles++;
+ }
+ }
+
+ if (nHandles > 0) {
+ wakeup();
+ }
+
+ return nHandles;
+ }
+
+ /**
+ * Process the incoming connections, creating a new session for each valid
+ * connection.
+ */
+ private int processConnections(Iterator<H> handlers) {
+ int nHandles = 0;
+
+ // Loop on each connection request
+ while (handlers.hasNext()) {
+ H handle = handlers.next();
+ handlers.remove();
+
+ ConnectionRequest connectionRequest = getConnectionRequest(handle);
+
+ if (connectionRequest == null) {
+ continue;
+ }
+
+ boolean success = false;
+ try {
+ if (finishConnect(handle)) {
+ S session = newSession(processor, handle);
+ initSession(session, connectionRequest, connectionRequest.getSessionInitializer());
+ // Forward the remaining process to the IoProcessor.
+ session.getProcessor().add(session);
+ nHandles++;
+ }
+ success = true;
+ } catch (Exception e) {
+ connectionRequest.setException(e);
+ } finally {
+ if (!success) {
+ // The connection failed, we have to cancel it.
+ cancelQueue.offer(connectionRequest);
+ }
+ }
+ }
+ return nHandles;
+ }
+
+ private void processTimedOutSessions(Iterator<H> handles) {
+ long currentTime = System.currentTimeMillis();
+
+ while (handles.hasNext()) {
+ H handle = handles.next();
+ ConnectionRequest connectionRequest = getConnectionRequest(handle);
+
+ if ((connectionRequest != null) && (currentTime >= connectionRequest.deadline)) {
+ connectionRequest.setException(new ConnectException("Connection timed out."));
+ cancelQueue.offer(connectionRequest);
+ }
+ }
+ }
}
+ /**
+ * A ConnectionRequest's Iouture
+ */
public final class ConnectionRequest extends DefaultConnectFuture {
/** The handle associated with this connection request */
private final H handle;
@@ -608,6 +616,12 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
/** The callback to call when the session is initialized */
private final IoSessionInitializer<? extends ConnectFuture> sessionInitializer;
+ /**
+ * Creates a new ConnectionRequest instance
+ *
+ * @param handle The IoHander
+ * @param callback The IoFuture callback
+ */
public ConnectionRequest(H handle, IoSessionInitializer<? extends ConnectFuture> callback) {
this.handle = handle;
long timeout = getConnectTimeoutMillis();
@@ -621,18 +635,30 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
this.sessionInitializer = callback;
}
+ /**
+ * @return The IoHandler instance
+ */
public H getHandle() {
return handle;
}
+ /**
+ * @return The connection deadline
+ */
public long getDeadline() {
return deadline;
}
+ /**
+ * @return The session initializer callback
+ */
public IoSessionInitializer<? extends ConnectFuture> getSessionInitializer() {
return sessionInitializer;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public boolean cancel() {
if (!isDone()) {
http://git-wip-us.apache.org/repos/asf/mina/blob/9b26714b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
index 853b8a3..48794e6 100644
--- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
+++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
@@ -223,7 +223,8 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
* Say if the list of {@link IoSession} polled by this {@link IoProcessor}
* is empty
*
- * @return <tt>true</tt> if at least a session is managed by this {@link IoProcessor}
+ * @return <tt>true</tt> if at least a session is managed by this
+ * {@link IoProcessor}
*/
protected abstract boolean isSelectorEmpty();
@@ -251,16 +252,17 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
/**
* Get the state of a session (One of OPENING, OPEN, CLOSING)
*
- * @param session the {@link IoSession} to inspect
+ * @param session
+ * the {@link IoSession} to inspect
* @return the state of the session
*/
protected abstract SessionState getState(S session);
-
-
+
/**
* Tells if the session ready for writing
*
- * @param session the queried session
+ * @param session
+ * the queried session
* @return <tt>true</tt> is ready, <tt>false</tt> if not ready
*/
protected abstract boolean isWritable(S session);
@@ -268,7 +270,8 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
/**
* Tells if the session ready for reading
*
- * @param session the queried session
+ * @param session
+ * the queried session
* @return <tt>true</tt> is ready, <tt>false</tt> if not ready
*/
protected abstract boolean isReadable(S session);
@@ -276,25 +279,32 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
/**
* Set the session to be informed when a write event should be processed
*
- * @param session the session for which we want to be interested in write events
- * @param isInterested <tt>true</tt> for registering, <tt>false</tt> for removing
- * @throws Exception If there was a problem while registering the session
+ * @param session
+ * the session for which we want to be interested in write events
+ * @param isInterested
+ * <tt>true</tt> for registering, <tt>false</tt> for removing
+ * @throws Exception
+ * If there was a problem while registering the session
*/
protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception;
/**
* Set the session to be informed when a read event should be processed
*
- * @param session the session for which we want to be interested in read events
- * @param isInterested <tt>true</tt> for registering, <tt>false</tt> for removing
- * @throws Exception If there was a problem while registering the session
+ * @param session
+ * the session for which we want to be interested in read events
+ * @param isInterested
+ * <tt>true</tt> for registering, <tt>false</tt> for removing
+ * @throws Exception
+ * If there was a problem while registering the session
*/
protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception;
/**
* Tells if this session is registered for reading
*
- * @param session the queried session
+ * @param session
+ * the queried session
* @return <tt>true</tt> is registered for reading
*/
protected abstract boolean isInterestedInRead(S session);
@@ -302,7 +312,8 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
/**
* Tells if this session is registered for writing
*
- * @param session the queried session
+ * @param session
+ * the queried session
* @return <tt>true</tt> is registered for writing
*/
protected abstract boolean isInterestedInWrite(S session);
@@ -310,16 +321,20 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
/**
* Initialize the polling of a session. Add it to the polling process.
*
- * @param session the {@link IoSession} to add to the polling
- * @throws Exception any exception thrown by the underlying system calls
+ * @param session
+ * the {@link IoSession} to add to the polling
+ * @throws Exception
+ * any exception thrown by the underlying system calls
*/
protected abstract void init(S session) throws Exception;
/**
* Destroy the underlying client socket handle
*
- * @param session the {@link IoSession}
- * @throws Exception any exception thrown by the underlying system calls
+ * @param session
+ * the {@link IoSession}
+ * @throws Exception
+ * any exception thrown by the underlying system calls
*/
protected abstract void destroy(S session) throws Exception;
@@ -327,10 +342,13 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
* Reads a sequence of bytes from a {@link IoSession} into the given
* {@link IoBuffer}. Is called when the session was found ready for reading.
*
- * @param session the session to read
- * @param buf the buffer to fill
+ * @param session
+ * the session to read
+ * @param buf
+ * the buffer to fill
* @return the number of bytes read
- * @throws Exception any exception thrown by the underlying system calls
+ * @throws Exception
+ * any exception thrown by the underlying system calls
*/
protected abstract int read(S session, IoBuffer buf) throws Exception;
@@ -338,12 +356,16 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
* Write a sequence of bytes to a {@link IoSession}, means to be called when
* a session was found ready for writing.
*
- * @param session the session to write
- * @param buf the buffer to write
- * @param length the number of bytes to write can be superior to the number of
+ * @param session
+ * the session to write
+ * @param buf
+ * the buffer to write
+ * @param length
+ * the number of bytes to write can be superior to the number of
* bytes remaining in the buffer
* @return the number of byte written
- * @throws IOException any exception thrown by the underlying system calls
+ * @throws IOException
+ * any exception thrown by the underlying system calls
*/
protected abstract int write(S session, IoBuffer buf, int length) throws IOException;
@@ -353,11 +375,15 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
* {@link UnsupportedOperationException} so the file will be send using
* usual {@link #write(AbstractIoSession, IoBuffer, int)} call.
*
- * @param session the session to write
- * @param region the file region to write
- * @param length the length of the portion to send
+ * @param session
+ * the session to write
+ * @param region
+ * the file region to write
+ * @param length
+ * the length of the portion to send
* @return the number of written bytes
- * @throws Exception any exception thrown by the underlying system calls
+ * @throws Exception
+ * any exception thrown by the underlying system calls
*/
protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;
@@ -417,18 +443,11 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
}
}
- private void scheduleFlush(S session) {
- // add the session to the queue if it's not already
- // in the queue
- if (session.setScheduledForFlush(true)) {
- flushingSessions.add(session);
- }
- }
-
/**
* Updates the traffic mask for a given session
*
- * @param session the session to update
+ * @param session
+ * the session to update
*/
public final void updateTrafficMask(S session) {
trafficControllingSessions.add(session);
@@ -460,7 +479,8 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
* trash the buggy selector and create a new one, registring all the sockets
* on it.
*
- * @throws IOException If we got an exception
+ * @throws IOException
+ * If we got an exception
*/
protected abstract void registerNewSelector() throws IOException;
@@ -470,202 +490,11 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
* have to loop.
*
* @return <tt>true</tt> if a connection has been brutally closed.
- * @throws IOException If we got an exception
+ * @throws IOException
+ * If we got an exception
*/
protected abstract boolean isBrokenConnection() throws IOException;
- /**
- * Loops over the new sessions blocking queue and returns the number of
- * sessions which are effectively created
- *
- * @return The number of new sessions
- */
- private int handleNewSessions() {
- int addedSessions = 0;
-
- for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
- if (addNow(session)) {
- // A new session has been created
- addedSessions++;
- }
- }
-
- return addedSessions;
- }
-
- /**
- * Process a new session : - initialize it - create its chain - fire the
- * CREATED listeners if any
- *
- * @param session The session to create
- * @return <tt>true</tt> if the session has been registered
- */
- private boolean addNow(S session) {
- boolean registered = false;
-
- try {
- init(session);
- registered = true;
-
- // Build the filter chain of this session.
- IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
- chainBuilder.buildFilterChain(session.getFilterChain());
-
- // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
- // in AbstractIoFilterChain.fireSessionOpened().
- // Propagate the SESSION_CREATED event up to the chain
- IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
- listeners.fireSessionCreated(session);
- } catch (Exception e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
-
- try {
- destroy(session);
- } catch (Exception e1) {
- ExceptionMonitor.getInstance().exceptionCaught(e1);
- } finally {
- registered = false;
- }
- }
-
- return registered;
- }
-
- private int removeSessions() {
- int removedSessions = 0;
-
- for (S session = removingSessions.poll(); session != null;session = removingSessions.poll()) {
- SessionState state = getState(session);
-
- // Now deal with the removal accordingly to the session's state
- switch (state) {
- case OPENED:
- // Try to remove this session
- if (removeNow(session)) {
- removedSessions++;
- }
-
- break;
-
- case CLOSING:
- // Skip if channel is already closed
- // In any case, remove the session from the queue
- removedSessions++;
- break;
-
- case OPENING:
- // Remove session from the newSessions queue and
- // remove it
- newSessions.remove(session);
-
- if (removeNow(session)) {
- removedSessions++;
- }
-
- break;
-
- default:
- throw new IllegalStateException(String.valueOf(state));
- }
- }
-
- return removedSessions;
- }
-
- private boolean removeNow(S session) {
- clearWriteRequestQueue(session);
-
- try {
- destroy(session);
- return true;
- } catch (Exception e) {
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireExceptionCaught(e);
- } finally {
- try {
- clearWriteRequestQueue(session);
- ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
- } catch (Exception e) {
- // The session was either destroyed or not at this point.
- // We do not want any exception thrown from this "cleanup" code to change
- // the return value by bubbling up.
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireExceptionCaught(e);
- }
- }
-
- return false;
- }
-
- private void clearWriteRequestQueue(S session) {
- WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
- WriteRequest req;
-
- List<WriteRequest> failedRequests = new ArrayList<>();
-
- if ((req = writeRequestQueue.poll(session)) != null) {
- Object message = req.getMessage();
-
- if (message instanceof IoBuffer) {
- IoBuffer buf = (IoBuffer) message;
-
- // The first unwritten empty buffer must be
- // forwarded to the filter chain.
- if (buf.hasRemaining()) {
- buf.reset();
- failedRequests.add(req);
- } else {
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireMessageSent(req);
- }
- } else {
- failedRequests.add(req);
- }
-
- // Discard others.
- while ((req = writeRequestQueue.poll(session)) != null) {
- failedRequests.add(req);
- }
- }
-
- // Create an exception and notify.
- if (!failedRequests.isEmpty()) {
- WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
-
- for (WriteRequest r : failedRequests) {
- session.decreaseScheduledBytesAndMessages(r);
- r.getFuture().setException(cause);
- }
-
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireExceptionCaught(cause);
- }
- }
-
- private void process() throws Exception {
- for (Iterator<S> i = selectedSessions(); i.hasNext();) {
- S session = i.next();
- process(session);
- i.remove();
- }
- }
-
- /**
- * Deal with session ready for the read or write operations, or both.
- */
- private void process(S session) {
- // Process Reads
- if (isReadable(session) && !session.isReadSuspended()) {
- read(session);
- }
-
- // Process writes
- if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) {
- // add the session to the queue, if it's not already there
- flushingSessions.add(session);
- }
- }
-
private void read(S session) {
IoSessionConfig config = session.getConfig();
int bufferSize = config.getReadBufferSize();
@@ -717,12 +546,11 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
filterChain.fireInputClosed();
}
} catch (Exception e) {
- if (e instanceof IOException) {
- if (!(e instanceof PortUnreachableException)
+ if ((e instanceof IOException) &&
+ (!(e instanceof PortUnreachableException)
|| !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
- || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
- scheduleRemove(session);
- }
+ || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable())) {
+ scheduleRemove(session);
}
IoFilterChain filterChain = session.getFilterChain();
@@ -730,306 +558,6 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
}
}
- private void notifyIdleSessions(long currentTime) throws Exception {
- // process idle sessions
- if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
- lastIdleCheckTime = currentTime;
- AbstractIoSession.notifyIdleness(allSessions(), currentTime);
- }
- }
-
- /**
- * Write all the pending messages
- */
- private void flush(long currentTime) {
- if (flushingSessions.isEmpty()) {
- return;
- }
-
- do {
- S session = flushingSessions.poll(); // the same one with
- // firstSession
-
- if (session == null) {
- // Just in case ... It should not happen.
- break;
- }
-
- // Reset the Schedule for flush flag for this session,
- // as we are flushing it now
- session.unscheduledForFlush();
-
- SessionState state = getState(session);
-
- switch (state) {
- case OPENED:
- try {
- boolean flushedAll = flushNow(session, currentTime);
-
- if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
- && !session.isScheduledForFlush()) {
- scheduleFlush(session);
- }
- } catch (Exception e) {
- scheduleRemove(session);
- session.closeNow();
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireExceptionCaught(e);
- }
-
- break;
-
- case CLOSING:
- // Skip if the channel is already closed.
- break;
-
- case OPENING:
- // Retry later if session is not yet fully initialized.
- // (In case that Session.write() is called before addSession()
- // is processed)
- scheduleFlush(session);
- return;
-
- default:
- throw new IllegalStateException(String.valueOf(state));
- }
-
- } while (!flushingSessions.isEmpty());
- }
-
- private boolean flushNow(S session, long currentTime) {
- if (!session.isConnected()) {
- scheduleRemove(session);
- return false;
- }
-
- final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
-
- final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
-
- // Set limitation for the number of written bytes for read-write
- // fairness. I used maxReadBufferSize * 3 / 2, which yields best
- // performance in my experience while not breaking fairness much.
- final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
- + (session.getConfig().getMaxReadBufferSize() >>> 1);
- int writtenBytes = 0;
- WriteRequest req = null;
-
- try {
- // Clear OP_WRITE
- setInterestedInWrite(session, false);
-
- do {
- // Check for pending writes.
- req = session.getCurrentWriteRequest();
-
- if (req == null) {
- req = writeRequestQueue.poll(session);
-
- if (req == null) {
- break;
- }
-
- session.setCurrentWriteRequest(req);
- }
-
- int localWrittenBytes;
- Object message = req.getMessage();
-
- if (message instanceof IoBuffer) {
- localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
- currentTime);
-
- if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
- // the buffer isn't empty, we re-interest it in writing
- writtenBytes += localWrittenBytes;
- setInterestedInWrite(session, true);
- return false;
- }
- } else if (message instanceof FileRegion) {
- localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
- currentTime);
-
- // Fix for Java bug on Linux
- // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
- // If there's still data to be written in the FileRegion,
- // return 0 indicating that we need
- // to pause until writing may resume.
- if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
- writtenBytes += localWrittenBytes;
- setInterestedInWrite(session, true);
- return false;
- }
- } else {
- throw new IllegalStateException("Don't know how to handle message of type '"
- + message.getClass().getName() + "'. Are you missing a protocol encoder?");
- }
-
- if (localWrittenBytes == 0) {
-
- // Kernel buffer is full.
- if (!req.equals(AbstractIoSession.MESSAGE_SENT_REQUEST)) {
- setInterestedInWrite(session, true);
- return false;
- }
- } else {
- writtenBytes += localWrittenBytes;
-
- if (writtenBytes >= maxWrittenBytes) {
- // Wrote too much
- scheduleFlush(session);
- return false;
- }
- }
-
- if (message instanceof IoBuffer) {
- ((IoBuffer) message).free();
- }
- } while (writtenBytes < maxWrittenBytes);
- } catch (Exception e) {
- if (req != null) {
- req.getFuture().setException(e);
- }
-
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireExceptionCaught(e);
- return false;
- }
-
- return true;
- }
-
- private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
- throws Exception {
- IoBuffer buf = (IoBuffer) req.getMessage();
- int localWrittenBytes = 0;
-
- if (buf.hasRemaining()) {
- int length;
-
- if (hasFragmentation) {
- length = Math.min(buf.remaining(), maxLength);
- } else {
- length = buf.remaining();
- }
-
- try {
- localWrittenBytes = write(session, buf, length);
- } catch (IOException ioe) {
- // We have had an issue while trying to send data to the
- // peer : let's close the session.
- buf.free();
- session.closeNow();
- removeNow(session);
-
- return 0;
- }
- }
-
- session.increaseWrittenBytes(localWrittenBytes, currentTime);
-
- // Now, forward the original message
- if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
- // Buffer has been sent, clear the current request.
- Object originalMessage = req.getOriginalRequest().getMessage();
-
- if (originalMessage instanceof IoBuffer) {
- buf = ((IoBuffer)req.getOriginalRequest().getMessage());
-
- int pos = buf.position();
- buf.reset();
- fireMessageSent(session, req);
- // And set it back to its position
- buf.position(pos);
- } else {
- fireMessageSent(session, req);
- }
- }
-
- return localWrittenBytes;
- }
-
- private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
- throws Exception {
- int localWrittenBytes;
- FileRegion region = (FileRegion) req.getMessage();
-
- if (region.getRemainingBytes() > 0) {
- int length;
-
- if (hasFragmentation) {
- length = (int) Math.min(region.getRemainingBytes(), maxLength);
- } else {
- length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
- }
-
- localWrittenBytes = transferFile(session, region, length);
- region.update(localWrittenBytes);
- } else {
- localWrittenBytes = 0;
- }
-
- session.increaseWrittenBytes(localWrittenBytes, currentTime);
-
- if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) {
- fireMessageSent(session, req);
- }
-
- return localWrittenBytes;
- }
-
- private void fireMessageSent(S session, WriteRequest req) {
- session.setCurrentWriteRequest(null);
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireMessageSent(req);
- }
-
- /**
- * Update the trafficControl for all the session.
- */
- private void updateTrafficMask() {
- int queueSize = trafficControllingSessions.size();
-
- while (queueSize > 0) {
- S session = trafficControllingSessions.poll();
-
- if (session == null) {
- // We are done with this queue.
- return;
- }
-
- SessionState state = getState(session);
-
- switch (state) {
- case OPENED:
- updateTrafficControl(session);
-
- break;
-
- case CLOSING:
- break;
-
- case OPENING:
- // Retry later if session is not yet fully initialized.
- // (In case that Session.suspend??() or session.resume??() is
- // called before addSession() is processed)
- // We just put back the session at the end of the queue.
- trafficControllingSessions.add(session);
- break;
-
- default:
- throw new IllegalStateException(String.valueOf(state));
- }
-
- // As we have handled one session, decrement the number of
- // remaining sessions. The OPENING session will be processed
- // with the next select(), as the queue size has been decreased,
- // even
- // if the session has been pushed at the end of the queue
- queueSize--;
- }
- }
-
/**
* {@inheritDoc}
*/
@@ -1058,8 +586,12 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
* sessions -
*/
private class Processor implements Runnable {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void run() {
- assert (processorRef.get() == this);
+ assert processorRef.get() == this;
int nSessions = 0;
lastIdleCheckTime = System.currentTimeMillis();
@@ -1137,31 +669,31 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
if (newSessions.isEmpty() && isSelectorEmpty()) {
// newSessions.add() precedes startupProcessor
- assert (processorRef.get() != this);
+ assert processorRef.get() != this;
break;
}
- assert (processorRef.get() != this);
+ assert processorRef.get() != this;
if (!processorRef.compareAndSet(null, this)) {
// startupProcessor won race, so must exit processor
- assert (processorRef.get() != this);
+ assert processorRef.get() != this;
break;
}
- assert (processorRef.get() == this);
+ assert processorRef.get() == this;
}
// Disconnect all sessions immediately if disposal has been
// requested so that we exit this loop eventually.
if (isDisposing()) {
boolean hasKeys = false;
-
+
for (Iterator<S> i = allSessions(); i.hasNext();) {
IoSession session = i.next();
-
+
if (session.isActive()) {
- scheduleRemove((S)session);
+ scheduleRemove((S) session);
hasKeys = true;
}
}
@@ -1198,5 +730,507 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
disposalFuture.setValue(true);
}
}
+
+ /**
+ * Loops over the new sessions blocking queue and returns the number of
+ * sessions which are effectively created
+ *
+ * @return The number of new sessions
+ */
+ private int handleNewSessions() {
+ int addedSessions = 0;
+
+ for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
+ if (addNow(session)) {
+ // A new session has been created
+ addedSessions++;
+ }
+ }
+
+ return addedSessions;
+ }
+
+ private void notifyIdleSessions(long currentTime) throws Exception {
+ // process idle sessions
+ if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
+ lastIdleCheckTime = currentTime;
+ AbstractIoSession.notifyIdleness(allSessions(), currentTime);
+ }
+ }
+
+ /**
+ * Update the trafficControl for all the session.
+ */
+ private void updateTrafficMask() {
+ int queueSize = trafficControllingSessions.size();
+
+ while (queueSize > 0) {
+ S session = trafficControllingSessions.poll();
+
+ if (session == null) {
+ // We are done with this queue.
+ return;
+ }
+
+ SessionState state = getState(session);
+
+ switch (state) {
+ case OPENED:
+ updateTrafficControl(session);
+
+ break;
+
+ case CLOSING:
+ break;
+
+ case OPENING:
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.suspend??() or session.resume??() is
+ // called before addSession() is processed)
+ // We just put back the session at the end of the queue.
+ trafficControllingSessions.add(session);
+ break;
+
+ default:
+ throw new IllegalStateException(String.valueOf(state));
+ }
+
+ // As we have handled one session, decrement the number of
+ // remaining sessions. The OPENING session will be processed
+ // with the next select(), as the queue size has been decreased,
+ // even
+ // if the session has been pushed at the end of the queue
+ queueSize--;
+ }
+ }
+
+ /**
+ * Process a new session : - initialize it - create its chain - fire the
+ * CREATED listeners if any
+ *
+ * @param session
+ * The session to create
+ * @return <tt>true</tt> if the session has been registered
+ */
+ private boolean addNow(S session) {
+ boolean registered = false;
+
+ try {
+ init(session);
+ registered = true;
+
+ // Build the filter chain of this session.
+ IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
+ chainBuilder.buildFilterChain(session.getFilterChain());
+
+ // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
+ // in AbstractIoFilterChain.fireSessionOpened().
+ // Propagate the SESSION_CREATED event up to the chain
+ IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
+ listeners.fireSessionCreated(session);
+ } catch (Exception e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+
+ try {
+ destroy(session);
+ } catch (Exception e1) {
+ ExceptionMonitor.getInstance().exceptionCaught(e1);
+ } finally {
+ registered = false;
+ }
+ }
+
+ return registered;
+ }
+
+ private int removeSessions() {
+ int removedSessions = 0;
+
+ for (S session = removingSessions.poll(); session != null; session = removingSessions.poll()) {
+ SessionState state = getState(session);
+
+ // Now deal with the removal accordingly to the session's state
+ switch (state) {
+ case OPENED:
+ // Try to remove this session
+ if (removeNow(session)) {
+ removedSessions++;
+ }
+
+ break;
+
+ case CLOSING:
+ // Skip if channel is already closed
+ // In any case, remove the session from the queue
+ removedSessions++;
+ break;
+
+ case OPENING:
+ // Remove session from the newSessions queue and
+ // remove it
+ newSessions.remove(session);
+
+ if (removeNow(session)) {
+ removedSessions++;
+ }
+
+ break;
+
+ default:
+ throw new IllegalStateException(String.valueOf(state));
+ }
+ }
+
+ return removedSessions;
+ }
+
+ /**
+ * Write all the pending messages
+ */
+ private void flush(long currentTime) {
+ if (flushingSessions.isEmpty()) {
+ return;
+ }
+
+ do {
+ S session = flushingSessions.poll(); // the same one with
+ // firstSession
+
+ if (session == null) {
+ // Just in case ... It should not happen.
+ break;
+ }
+
+ // Reset the Schedule for flush flag for this session,
+ // as we are flushing it now
+ session.unscheduledForFlush();
+
+ SessionState state = getState(session);
+
+ switch (state) {
+ case OPENED:
+ try {
+ boolean flushedAll = flushNow(session, currentTime);
+
+ if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
+ && !session.isScheduledForFlush()) {
+ scheduleFlush(session);
+ }
+ } catch (Exception e) {
+ scheduleRemove(session);
+ session.closeNow();
+ IoFilterChain filterChain = session.getFilterChain();
+ filterChain.fireExceptionCaught(e);
+ }
+
+ break;
+
+ case CLOSING:
+ // Skip if the channel is already closed.
+ break;
+
+ case OPENING:
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.write() is called before addSession()
+ // is processed)
+ scheduleFlush(session);
+ return;
+
+ default:
+ throw new IllegalStateException(String.valueOf(state));
+ }
+
+ } while (!flushingSessions.isEmpty());
+ }
+
+ private boolean flushNow(S session, long currentTime) {
+ if (!session.isConnected()) {
+ scheduleRemove(session);
+ return false;
+ }
+
+ final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
+
+ final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
+
+ // Set limitation for the number of written bytes for read-write
+ // fairness. I used maxReadBufferSize * 3 / 2, which yields best
+ // performance in my experience while not breaking fairness much.
+ final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
+ + (session.getConfig().getMaxReadBufferSize() >>> 1);
+ int writtenBytes = 0;
+ WriteRequest req = null;
+
+ try {
+ // Clear OP_WRITE
+ setInterestedInWrite(session, false);
+
+ do {
+ // Check for pending writes.
+ req = session.getCurrentWriteRequest();
+
+ if (req == null) {
+ req = writeRequestQueue.poll(session);
+
+ if (req == null) {
+ break;
+ }
+
+ session.setCurrentWriteRequest(req);
+ }
+
+ int localWrittenBytes;
+ Object message = req.getMessage();
+
+ if (message instanceof IoBuffer) {
+ localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
+ currentTime);
+
+ if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
+ // the buffer isn't empty, we re-interest it in writing
+ setInterestedInWrite(session, true);
+
+ return false;
+ }
+ } else if (message instanceof FileRegion) {
+ localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
+ currentTime);
+
+ // Fix for Java bug on Linux
+ // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
+ // If there's still data to be written in the FileRegion,
+ // return 0 indicating that we need
+ // to pause until writing may resume.
+ if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
+ setInterestedInWrite(session, true);
+
+ return false;
+ }
+ } else {
+ throw new IllegalStateException("Don't know how to handle message of type '"
+ + message.getClass().getName() + "'. Are you missing a protocol encoder?");
+ }
+
+ if (localWrittenBytes == 0) {
+
+ // Kernel buffer is full.
+ if (!req.equals(AbstractIoSession.MESSAGE_SENT_REQUEST)) {
+ setInterestedInWrite(session, true);
+ return false;
+ }
+ } else {
+ writtenBytes += localWrittenBytes;
+
+ if (writtenBytes >= maxWrittenBytes) {
+ // Wrote too much
+ scheduleFlush(session);
+ return false;
+ }
+ }
+
+ if (message instanceof IoBuffer) {
+ ((IoBuffer) message).free();
+ }
+ } while (writtenBytes < maxWrittenBytes);
+ } catch (Exception e) {
+ if (req != null) {
+ req.getFuture().setException(e);
+ }
+
+ IoFilterChain filterChain = session.getFilterChain();
+ filterChain.fireExceptionCaught(e);
+ return false;
+ }
+
+ return true;
+ }
+
+ private void scheduleFlush(S session) {
+ // add the session to the queue if it's not already
+ // in the queue
+ if (session.setScheduledForFlush(true)) {
+ flushingSessions.add(session);
+ }
+ }
+
+ private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
+ throws Exception {
+ int localWrittenBytes;
+ FileRegion region = (FileRegion) req.getMessage();
+
+ if (region.getRemainingBytes() > 0) {
+ int length;
+
+ if (hasFragmentation) {
+ length = (int) Math.min(region.getRemainingBytes(), maxLength);
+ } else {
+ length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
+ }
+
+ localWrittenBytes = transferFile(session, region, length);
+ region.update(localWrittenBytes);
+ } else {
+ localWrittenBytes = 0;
+ }
+
+ session.increaseWrittenBytes(localWrittenBytes, currentTime);
+
+ if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) {
+ fireMessageSent(session, req);
+ }
+
+ return localWrittenBytes;
+ }
+
+ private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
+ throws Exception {
+ IoBuffer buf = (IoBuffer) req.getMessage();
+ int localWrittenBytes = 0;
+
+ if (buf.hasRemaining()) {
+ int length;
+
+ if (hasFragmentation) {
+ length = Math.min(buf.remaining(), maxLength);
+ } else {
+ length = buf.remaining();
+ }
+
+ try {
+ localWrittenBytes = write(session, buf, length);
+ } catch (IOException ioe) {
+ // We have had an issue while trying to send data to the
+ // peer : let's close the session.
+ buf.free();
+ session.closeNow();
+ removeNow(session);
+
+ return 0;
+ }
+ }
+
+ session.increaseWrittenBytes(localWrittenBytes, currentTime);
+
+ // Now, forward the original message
+ if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
+ // Buffer has been sent, clear the current request.
+ Object originalMessage = req.getOriginalRequest().getMessage();
+
+ if (originalMessage instanceof IoBuffer) {
+ buf = (IoBuffer) req.getOriginalRequest().getMessage();
+
+ int pos = buf.position();
+ buf.reset();
+ fireMessageSent(session, req);
+ // And set it back to its position
+ buf.position(pos);
+ } else {
+ fireMessageSent(session, req);
+ }
+ }
+
+ return localWrittenBytes;
+ }
+
+ private boolean removeNow(S session) {
+ clearWriteRequestQueue(session);
+
+ try {
+ destroy(session);
+ return true;
+ } catch (Exception e) {
+ IoFilterChain filterChain = session.getFilterChain();
+ filterChain.fireExceptionCaught(e);
+ } finally {
+ try {
+ clearWriteRequestQueue(session);
+ ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
+ } catch (Exception e) {
+ // The session was either destroyed or not at this point.
+ // We do not want any exception thrown from this "cleanup" code
+ // to change
+ // the return value by bubbling up.
+ IoFilterChain filterChain = session.getFilterChain();
+ filterChain.fireExceptionCaught(e);
+ }
+ }
+
+ return false;
+ }
+
+ private void clearWriteRequestQueue(S session) {
+ WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
+ WriteRequest req;
+
+ List<WriteRequest> failedRequests = new ArrayList<>();
+
+ if ((req = writeRequestQueue.poll(session)) != null) {
+ Object message = req.getMessage();
+
+ if (message instanceof IoBuffer) {
+ IoBuffer buf = (IoBuffer) message;
+
+ // The first unwritten empty buffer must be
+ // forwarded to the filter chain.
+ if (buf.hasRemaining()) {
+ buf.reset();
+ failedRequests.add(req);
+ } else {
+ IoFilterChain filterChain = session.getFilterChain();
+ filterChain.fireMessageSent(req);
+ }
+ } else {
+ failedRequests.add(req);
+ }
+
+ // Discard others.
+ while ((req = writeRequestQueue.poll(session)) != null) {
+ failedRequests.add(req);
+ }
+ }
+
+ // Create an exception and notify.
+ if (!failedRequests.isEmpty()) {
+ WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
+
+ for (WriteRequest r : failedRequests) {
+ session.decreaseScheduledBytesAndMessages(r);
+ r.getFuture().setException(cause);
+ }
+
+ IoFilterChain filterChain = session.getFilterChain();
+ filterChain.fireExceptionCaught(cause);
+ }
+ }
+
+ private void fireMessageSent(S session, WriteRequest req) {
+ session.setCurrentWriteRequest(null);
+ IoFilterChain filterChain = session.getFilterChain();
+ filterChain.fireMessageSent(req);
+ }
+
+ private void process() throws Exception {
+ for (Iterator<S> i = selectedSessions(); i.hasNext();) {
+ S session = i.next();
+ process(session);
+ i.remove();
+ }
+ }
+
+ /**
+ * Deal with session ready for the read or write operations, or both.
+ */
+ private void process(S session) {
+ // Process Reads
+ if (isReadable(session) && !session.isReadSuspended()) {
+ read(session);
+ }
+
+ // Process writes
+ if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) {
+ // add the session to the queue, if it's not already there
+ flushingSessions.add(session);
+ }
+ }
}
}
[3/4] mina git commit: Fixed some Soarlint warnings
Posted by el...@apache.org.
Fixed some Soarlint warnings
Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/c87701fb
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/c87701fb
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/c87701fb
Branch: refs/heads/2.0
Commit: c87701fb188cef67fa500a17780a107f6c0a456e
Parents: 37239fd
Author: Emmanuel L�charny <el...@symas.com>
Authored: Wed Dec 7 11:43:43 2016 +0100
Committer: Emmanuel L�charny <el...@symas.com>
Committed: Wed Dec 7 11:43:43 2016 +0100
----------------------------------------------------------------------
.../socket/AbstractDatagramSessionConfig.java | 2 ++
.../mina/transport/socket/DatagramAcceptor.java | 3 +++
.../transport/socket/DatagramConnector.java | 2 ++
.../socket/DefaultDatagramSessionConfig.java | 26 +++++++++++++++++++-
.../mina/transport/socket/SocketAcceptor.java | 3 +++
.../mina/transport/socket/SocketConnector.java | 2 ++
.../mina/transport/socket/nio/NioSession.java | 6 +++++
.../socket/nio/NioSocketConnector.java | 6 +++++
.../transport/socket/nio/NioSocketSession.java | 1 +
9 files changed, 50 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina/blob/c87701fb/mina-core/src/main/java/org/apache/mina/transport/socket/AbstractDatagramSessionConfig.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/AbstractDatagramSessionConfig.java b/mina-core/src/main/java/org/apache/mina/transport/socket/AbstractDatagramSessionConfig.java
index 67daf74..0ef1b9f 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/AbstractDatagramSessionConfig.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/AbstractDatagramSessionConfig.java
@@ -136,6 +136,7 @@ public abstract class AbstractDatagramSessionConfig extends AbstractIoSessionCon
/**
* {@inheritDoc}
*/
+ @Override
public boolean isCloseOnPortUnreachable() {
return closeOnPortUnreachable;
}
@@ -143,6 +144,7 @@ public abstract class AbstractDatagramSessionConfig extends AbstractIoSessionCon
/**
* {@inheritDoc}
*/
+ @Override
public void setCloseOnPortUnreachable(boolean closeOnPortUnreachable) {
this.closeOnPortUnreachable = closeOnPortUnreachable;
}
http://git-wip-us.apache.org/repos/asf/mina/blob/c87701fb/mina-core/src/main/java/org/apache/mina/transport/socket/DatagramAcceptor.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/DatagramAcceptor.java b/mina-core/src/main/java/org/apache/mina/transport/socket/DatagramAcceptor.java
index fba0319..bc0c687 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/DatagramAcceptor.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/DatagramAcceptor.java
@@ -38,12 +38,14 @@ public interface DatagramAcceptor extends IoAcceptor {
* necessarily the firstly bound address.
* This method overrides the {@link IoAcceptor#getLocalAddress()} method.
*/
+ @Override
InetSocketAddress getLocalAddress();
/**
* @return a {@link Set} of the local InetSocketAddress which are bound currently.
* This method overrides the {@link IoAcceptor#getDefaultLocalAddress()} method.
*/
+ @Override
InetSocketAddress getDefaultLocalAddress();
/**
@@ -72,5 +74,6 @@ public interface DatagramAcceptor extends IoAcceptor {
* @return the default Datagram configuration of the new {@link IoSession}s
* created by this service.
*/
+ @Override
DatagramSessionConfig getSessionConfig();
}
http://git-wip-us.apache.org/repos/asf/mina/blob/c87701fb/mina-core/src/main/java/org/apache/mina/transport/socket/DatagramConnector.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/DatagramConnector.java b/mina-core/src/main/java/org/apache/mina/transport/socket/DatagramConnector.java
index 02e5249..15ee056 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/DatagramConnector.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/DatagramConnector.java
@@ -34,12 +34,14 @@ public interface DatagramConnector extends IoConnector {
* is specified in {@link #connect()} method.
* This method overrides the {@link IoConnector#getDefaultRemoteAddress()} method.
*/
+ @Override
InetSocketAddress getDefaultRemoteAddress();
/**
* @return the default configuration of the new FatagramSessions created by
* this connect service.
*/
+ @Override
DatagramSessionConfig getSessionConfig();
/**
http://git-wip-us.apache.org/repos/asf/mina/blob/c87701fb/mina-core/src/main/java/org/apache/mina/transport/socket/DefaultDatagramSessionConfig.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/DefaultDatagramSessionConfig.java b/mina-core/src/main/java/org/apache/mina/transport/socket/DefaultDatagramSessionConfig.java
index 843e893..198d479 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/DefaultDatagramSessionConfig.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/DefaultDatagramSessionConfig.java
@@ -59,6 +59,7 @@ public class DefaultDatagramSessionConfig extends AbstractDatagramSessionConfig
/**
* @see DatagramSocket#getBroadcast()
*/
+ @Override
public boolean isBroadcast() {
return broadcast;
}
@@ -66,6 +67,7 @@ public class DefaultDatagramSessionConfig extends AbstractDatagramSessionConfig
/**
* @see DatagramSocket#setBroadcast(boolean)
*/
+ @Override
public void setBroadcast(boolean broadcast) {
this.broadcast = broadcast;
}
@@ -73,6 +75,7 @@ public class DefaultDatagramSessionConfig extends AbstractDatagramSessionConfig
/**
* @see DatagramSocket#getReuseAddress()
*/
+ @Override
public boolean isReuseAddress() {
return reuseAddress;
}
@@ -80,6 +83,7 @@ public class DefaultDatagramSessionConfig extends AbstractDatagramSessionConfig
/**
* @see DatagramSocket#setReuseAddress(boolean)
*/
+ @Override
public void setReuseAddress(boolean reuseAddress) {
this.reuseAddress = reuseAddress;
}
@@ -87,6 +91,7 @@ public class DefaultDatagramSessionConfig extends AbstractDatagramSessionConfig
/**
* @see DatagramSocket#getReceiveBufferSize()
*/
+ @Override
public int getReceiveBufferSize() {
return receiveBufferSize;
}
@@ -94,6 +99,7 @@ public class DefaultDatagramSessionConfig extends AbstractDatagramSessionConfig
/**
* @see DatagramSocket#setReceiveBufferSize(int)
*/
+ @Override
public void setReceiveBufferSize(int receiveBufferSize) {
this.receiveBufferSize = receiveBufferSize;
}
@@ -101,6 +107,7 @@ public class DefaultDatagramSessionConfig extends AbstractDatagramSessionConfig
/**
* @see DatagramSocket#getSendBufferSize()
*/
+ @Override
public int getSendBufferSize() {
return sendBufferSize;
}
@@ -108,6 +115,7 @@ public class DefaultDatagramSessionConfig extends AbstractDatagramSessionConfig
/**
* @see DatagramSocket#setSendBufferSize(int)
*/
+ @Override
public void setSendBufferSize(int sendBufferSize) {
this.sendBufferSize = sendBufferSize;
}
@@ -115,6 +123,7 @@ public class DefaultDatagramSessionConfig extends AbstractDatagramSessionConfig
/**
* @see DatagramSocket#getTrafficClass()
*/
+ @Override
public int getTrafficClass() {
return trafficClass;
}
@@ -122,33 +131,48 @@ public class DefaultDatagramSessionConfig extends AbstractDatagramSessionConfig
/**
* @see DatagramSocket#setTrafficClass(int)
*/
+ @Override
public void setTrafficClass(int trafficClass) {
this.trafficClass = trafficClass;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
protected boolean isBroadcastChanged() {
return broadcast != DEFAULT_BROADCAST;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
protected boolean isReceiveBufferSizeChanged() {
return receiveBufferSize != DEFAULT_RECEIVE_BUFFER_SIZE;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
protected boolean isReuseAddressChanged() {
return reuseAddress != DEFAULT_REUSE_ADDRESS;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
protected boolean isSendBufferSizeChanged() {
return sendBufferSize != DEFAULT_SEND_BUFFER_SIZE;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
protected boolean isTrafficClassChanged() {
return trafficClass != DEFAULT_TRAFFIC_CLASS;
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mina/blob/c87701fb/mina-core/src/main/java/org/apache/mina/transport/socket/SocketAcceptor.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/SocketAcceptor.java b/mina-core/src/main/java/org/apache/mina/transport/socket/SocketAcceptor.java
index 86d1f47..b21f79c 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/SocketAcceptor.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/SocketAcceptor.java
@@ -38,12 +38,14 @@ public interface SocketAcceptor extends IoAcceptor {
* necessarily the firstly bound address.
* This method overrides the {@link IoAcceptor#getLocalAddress()} method.
*/
+ @Override
InetSocketAddress getLocalAddress();
/**
* @return a {@link Set} of the local InetSocketAddress which are bound currently.
* This method overrides the {@link IoAcceptor#getDefaultLocalAddress()} method.
*/
+ @Override
InetSocketAddress getDefaultLocalAddress();
/**
@@ -87,5 +89,6 @@ public interface SocketAcceptor extends IoAcceptor {
* @return the default configuration of the new SocketSessions created by
* this acceptor service.
*/
+ @Override
SocketSessionConfig getSessionConfig();
}
http://git-wip-us.apache.org/repos/asf/mina/blob/c87701fb/mina-core/src/main/java/org/apache/mina/transport/socket/SocketConnector.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/SocketConnector.java b/mina-core/src/main/java/org/apache/mina/transport/socket/SocketConnector.java
index f91b188..0254c55 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/SocketConnector.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/SocketConnector.java
@@ -34,12 +34,14 @@ public interface SocketConnector extends IoConnector {
* is specified in {@link #connect()} method.
* This method overrides the {@link IoConnector#getDefaultRemoteAddress()} method.
*/
+ @Override
InetSocketAddress getDefaultRemoteAddress();
/**
* @return the default configuration of the new SocketSessions created by
* this connect service.
*/
+ @Override
SocketSessionConfig getSessionConfig();
/**
http://git-wip-us.apache.org/repos/asf/mina/blob/c87701fb/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSession.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSession.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSession.java
index bc80d9c..97245cf 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSession.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSession.java
@@ -70,6 +70,10 @@ public abstract class NioSession extends AbstractIoSession {
*/
abstract ByteChannel getChannel();
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public IoFilterChain getFilterChain() {
return filterChain;
}
@@ -93,6 +97,7 @@ public abstract class NioSession extends AbstractIoSession {
/**
* {@inheritDoc}
*/
+ @Override
public IoProcessor<NioSession> getProcessor() {
return processor;
}
@@ -100,6 +105,7 @@ public abstract class NioSession extends AbstractIoSession {
/**
* {@inheritDoc}
*/
+ @Override
public final boolean isActive() {
return key.isValid();
}
http://git-wip-us.apache.org/repos/asf/mina/blob/c87701fb/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java
index bd1cf00..63313d7 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java
@@ -142,6 +142,7 @@ SocketConnector {
/**
* {@inheritDoc}
*/
+ @Override
public TransportMetadata getTransportMetadata() {
return NioSocketSession.METADATA;
}
@@ -149,6 +150,7 @@ SocketConnector {
/**
* {@inheritDoc}
*/
+ @Override
public SocketSessionConfig getSessionConfig() {
return (SocketSessionConfig) sessionConfig;
}
@@ -164,6 +166,7 @@ SocketConnector {
/**
* {@inheritDoc}
*/
+ @Override
public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
super.setDefaultRemoteAddress(defaultRemoteAddress);
}
@@ -316,6 +319,7 @@ SocketConnector {
/**
* {@inheritDoc}
*/
+ @Override
public boolean hasNext() {
return i.hasNext();
}
@@ -323,6 +327,7 @@ SocketConnector {
/**
* {@inheritDoc}
*/
+ @Override
public SocketChannel next() {
SelectionKey key = i.next();
return (SocketChannel) key.channel();
@@ -331,6 +336,7 @@ SocketConnector {
/**
* {@inheritDoc}
*/
+ @Override
public void remove() {
i.remove();
}
http://git-wip-us.apache.org/repos/asf/mina/blob/c87701fb/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketSession.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketSession.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketSession.java
index 8948c55..8bae465 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketSession.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketSession.java
@@ -352,6 +352,7 @@ class NioSocketSession extends NioSession {
/**
* {@inheritDoc}
*/
+ @Override
public final boolean isSecured() {
// If the session does not have a SslFilter, we can return false
IoFilterChain chain = getFilterChain();
[4/4] mina git commit: o Added some missing Javadoc o Fixed some
SonarLint issues
Posted by el...@apache.org.
o Added some missing Javadoc
o Fixed some SonarLint issues
Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/7c080890
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/7c080890
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/7c080890
Branch: refs/heads/2.0
Commit: 7c080890b86005d743903552a53a51db4ad3ee13
Parents: c87701f
Author: Emmanuel L�charny <el...@symas.com>
Authored: Wed Dec 7 19:28:49 2016 +0100
Committer: Emmanuel L�charny <el...@symas.com>
Committed: Wed Dec 7 19:28:49 2016 +0100
----------------------------------------------------------------------
.../main/java/org/apache/mina/core/IoUtil.java | 67 +++++++++++++---
.../apache/mina/core/RuntimeIoException.java | 19 +++++
.../mina/core/write/DefaultWriteRequest.java | 84 ++++++++++++++++++++
.../core/write/NothingWrittenException.java | 56 ++++++++++++-
.../apache/mina/core/write/WriteException.java | 4 +-
.../mina/core/write/WriteRequestWrapper.java | 5 ++
.../mina/core/write/WriteTimeoutException.java | 56 ++++++++++++-
.../write/WriteToClosedSessionException.java | 57 ++++++++++++-
8 files changed, 325 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina/blob/7c080890/mina-core/src/main/java/org/apache/mina/core/IoUtil.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/IoUtil.java b/mina-core/src/main/java/org/apache/mina/core/IoUtil.java
index 98def63..b9711bd 100644
--- a/mina-core/src/main/java/org/apache/mina/core/IoUtil.java
+++ b/mina-core/src/main/java/org/apache/mina/core/IoUtil.java
@@ -39,6 +39,10 @@ import org.apache.mina.core.session.IoSession;
public final class IoUtil {
private static final IoSession[] EMPTY_SESSIONS = new IoSession[0];
+ private IoUtil() {
+ // Do nothing
+ }
+
/**
* Writes the specified {@code message} to the specified {@code sessions}.
* If the specified {@code message} is an {@link IoBuffer}, the buffer is
@@ -49,7 +53,7 @@ public final class IoUtil {
* @return The list of WriteFuture created for each broadcasted message
*/
public static List<WriteFuture> broadcast(Object message, Collection<IoSession> sessions) {
- List<WriteFuture> answer = new ArrayList<WriteFuture>(sessions.size());
+ List<WriteFuture> answer = new ArrayList<>(sessions.size());
broadcast(message, sessions.iterator(), answer);
return answer;
}
@@ -64,7 +68,7 @@ public final class IoUtil {
* @return The list of WriteFuture created for each broadcasted message
*/
public static List<WriteFuture> broadcast(Object message, Iterable<IoSession> sessions) {
- List<WriteFuture> answer = new ArrayList<WriteFuture>();
+ List<WriteFuture> answer = new ArrayList<>();
broadcast(message, sessions.iterator(), answer);
return answer;
}
@@ -79,7 +83,7 @@ public final class IoUtil {
* @return The list of {@link WriteFuture} for the written messages
*/
public static List<WriteFuture> broadcast(Object message, Iterator<IoSession> sessions) {
- List<WriteFuture> answer = new ArrayList<WriteFuture>();
+ List<WriteFuture> answer = new ArrayList<>();
broadcast(message, sessions, answer);
return answer;
}
@@ -98,7 +102,7 @@ public final class IoUtil {
sessions = EMPTY_SESSIONS;
}
- List<WriteFuture> answer = new ArrayList<WriteFuture>(sessions.length);
+ List<WriteFuture> answer = new ArrayList<>(sessions.length);
if (message instanceof IoBuffer) {
for (IoSession s : sessions) {
answer.add(s.write(((IoBuffer) message).duplicate()));
@@ -125,31 +129,78 @@ public final class IoUtil {
}
}
+ /**
+ * Wait on all the {@link IoFuture}s we get, or until one of the {@link IoFuture}s is interrupted
+ *
+ * @param futures The {@link IoFuture}s we are waiting on
+ * @throws InterruptedException If one of the {@link IoFuture} is interrupted
+ */
public static void await(Iterable<? extends IoFuture> futures) throws InterruptedException {
for (IoFuture f : futures) {
f.await();
}
}
+ /**
+ * Wait on all the {@link IoFuture}s we get. This can't get interrupted.
+ *
+ * @param futures The {@link IoFuture}s we are waiting on
+ */
public static void awaitUninterruptably(Iterable<? extends IoFuture> futures) {
for (IoFuture f : futures) {
f.awaitUninterruptibly();
}
}
+ /**
+ * Wait on all the {@link IoFuture}s we get, or until one of the {@link IoFuture}s is interrupted
+ *
+ * @param futures The {@link IoFuture}s we are waiting on
+ * @param timeout The maximum time we wait for the {@link IoFuture}s to complete
+ * @param unit The Time unit to use for the timeout
+ * @return <tt>TRUE</TT> if all the {@link IoFuture} have been completed, <tt>FALSE</tt> if
+ * at least one {@link IoFuture} haas been interrupted
+ * @throws InterruptedException If one of the {@link IoFuture} is interrupted
+ */
public static boolean await(Iterable<? extends IoFuture> futures, long timeout, TimeUnit unit)
throws InterruptedException {
return await(futures, unit.toMillis(timeout));
}
+ /**
+ * Wait on all the {@link IoFuture}s we get, or until one of the {@link IoFuture}s is interrupted
+ *
+ * @param futures The {@link IoFuture}s we are waiting on
+ * @param timeoutMillis The maximum milliseconds we wait for the {@link IoFuture}s to complete
+ * @return <tt>TRUE</TT> if all the {@link IoFuture} have been completed, <tt>FALSE</tt> if
+ * at least one {@link IoFuture} has been interrupted
+ * @throws InterruptedException If one of the {@link IoFuture} is interrupted
+ */
public static boolean await(Iterable<? extends IoFuture> futures, long timeoutMillis) throws InterruptedException {
return await0(futures, timeoutMillis, true);
}
+ /**
+ * Wait on all the {@link IoFuture}s we get.
+ *
+ * @param futures The {@link IoFuture}s we are waiting on
+ * @param timeout The maximum time we wait for the {@link IoFuture}s to complete
+ * @param unit The Time unit to use for the timeout
+ * @return <tt>TRUE</TT> if all the {@link IoFuture} have been completed, <tt>FALSE</tt> if
+ * at least one {@link IoFuture} has been interrupted
+ */
public static boolean awaitUninterruptibly(Iterable<? extends IoFuture> futures, long timeout, TimeUnit unit) {
return awaitUninterruptibly(futures, unit.toMillis(timeout));
}
+ /**
+ * Wait on all the {@link IoFuture}s we get.
+ *
+ * @param futures The {@link IoFuture}s we are waiting on
+ * @param timeoutMillis The maximum milliseconds we wait for the {@link IoFuture}s to complete
+ * @return <tt>TRUE</TT> if all the {@link IoFuture} have been completed, <tt>FALSE</tt> if
+ * at least one {@link IoFuture} has been interrupted
+ */
public static boolean awaitUninterruptibly(Iterable<? extends IoFuture> futures, long timeoutMillis) {
try {
return await0(futures, timeoutMillis, false);
@@ -165,8 +216,10 @@ public final class IoUtil {
boolean lastComplete = true;
Iterator<? extends IoFuture> i = futures.iterator();
+
while (i.hasNext()) {
IoFuture f = i.next();
+
do {
if (interruptable) {
lastComplete = f.await(waitTime);
@@ -176,7 +229,7 @@ public final class IoUtil {
waitTime = timeoutMillis - (System.currentTimeMillis() - startTime);
- if (lastComplete || waitTime <= 0) {
+ if (waitTime <= 0) {
break;
}
} while (!lastComplete);
@@ -188,8 +241,4 @@ public final class IoUtil {
return lastComplete && !i.hasNext();
}
-
- private IoUtil() {
- // Do nothing
- }
}
http://git-wip-us.apache.org/repos/asf/mina/blob/7c080890/mina-core/src/main/java/org/apache/mina/core/RuntimeIoException.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/RuntimeIoException.java b/mina-core/src/main/java/org/apache/mina/core/RuntimeIoException.java
index b014b24..88a4b3d 100644
--- a/mina-core/src/main/java/org/apache/mina/core/RuntimeIoException.java
+++ b/mina-core/src/main/java/org/apache/mina/core/RuntimeIoException.java
@@ -33,18 +33,37 @@ import java.io.IOException;
public class RuntimeIoException extends RuntimeException {
private static final long serialVersionUID = 9029092241311939548L;
+ /**
+ * Create a new RuntimeIoException instance
+ */
public RuntimeIoException() {
super();
}
+ /**
+ * Create a new RuntimeIoException instance
+ *
+ * @param message The error message
+ */
public RuntimeIoException(String message) {
super(message);
}
+ /**
+ * Create a new RuntimeIoException instance
+ *
+ * @param message The error message
+ * @param cause The original exception
+ */
public RuntimeIoException(String message, Throwable cause) {
super(message, cause);
}
+ /**
+ * Create a new RuntimeIoException instance
+ *
+ * @param cause The original exception
+ */
public RuntimeIoException(Throwable cause) {
super(cause);
}
http://git-wip-us.apache.org/repos/asf/mina/blob/7c080890/mina-core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java b/mina-core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java
index f03bbd7..1d1e5fb 100644
--- a/mina-core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java
+++ b/mina-core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java
@@ -37,66 +37,130 @@ public class DefaultWriteRequest implements WriteRequest {
/** An empty FUTURE */
private static final WriteFuture UNUSED_FUTURE = new WriteFuture() {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public boolean isWritten() {
return false;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void setWritten() {
// Do nothing
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public IoSession getSession() {
return null;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void join() {
// Do nothing
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public boolean join(long timeoutInMillis) {
return true;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public boolean isDone() {
return true;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public WriteFuture addListener(IoFutureListener<?> listener) {
throw new IllegalStateException("You can't add a listener to a dummy future.");
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public WriteFuture removeListener(IoFutureListener<?> listener) {
throw new IllegalStateException("You can't add a listener to a dummy future.");
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public WriteFuture await() throws InterruptedException {
return this;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return true;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public boolean await(long timeoutMillis) throws InterruptedException {
return true;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public WriteFuture awaitUninterruptibly() {
return this;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
return true;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public boolean awaitUninterruptibly(long timeoutMillis) {
return true;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public Throwable getException() {
return null;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void setException(Throwable cause) {
// Do nothing
}
@@ -151,18 +215,34 @@ public class DefaultWriteRequest implements WriteRequest {
this.destination = destination;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public WriteFuture getFuture() {
return future;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public Object getMessage() {
return message;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public WriteRequest getOriginalRequest() {
return this;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public SocketAddress getDestination() {
return destination;
}
@@ -190,6 +270,10 @@ public class DefaultWriteRequest implements WriteRequest {
return sb.toString();
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public boolean isEncoded() {
return false;
}
http://git-wip-us.apache.org/repos/asf/mina/blob/7c080890/mina-core/src/main/java/org/apache/mina/core/write/NothingWrittenException.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/write/NothingWrittenException.java b/mina-core/src/main/java/org/apache/mina/core/write/NothingWrittenException.java
index 66e228e..6ccf9c7 100644
--- a/mina-core/src/main/java/org/apache/mina/core/write/NothingWrittenException.java
+++ b/mina-core/src/main/java/org/apache/mina/core/write/NothingWrittenException.java
@@ -31,34 +31,82 @@ public class NothingWrittenException extends WriteException {
private static final long serialVersionUID = -6331979307737691005L;
+ /**
+ * Create a new NothingWrittenException instance
+ *
+ * @param requests The {@link WriteRequest}s that haven't been written
+ * @param message The error message
+ * @param cause The original exception
+ */
public NothingWrittenException(Collection<WriteRequest> requests, String message, Throwable cause) {
super(requests, message, cause);
}
- public NothingWrittenException(Collection<WriteRequest> requests, String s) {
- super(requests, s);
+ /**
+ * Create a new NothingWrittenException instance
+ *
+ * @param requests The {@link WriteRequest}s that haven't been written
+ * @param message The error message
+ */
+ public NothingWrittenException(Collection<WriteRequest> requests, String message) {
+ super(requests, message);
}
+ /**
+ * Create a new NothingWrittenException instance
+ *
+ * @param requests The {@link WriteRequest} that haven't been written
+ * @param cause The original exception
+ */
public NothingWrittenException(Collection<WriteRequest> requests, Throwable cause) {
super(requests, cause);
}
+ /**
+ * Create a new NothingWrittenException instance
+ *
+ * @param requests The {@link WriteRequest} that haven't been written
+ */
public NothingWrittenException(Collection<WriteRequest> requests) {
super(requests);
}
+ /**
+ * Create a new NothingWrittenException instance
+ *
+ * @param request The {@link WriteRequest} that hasn't been written
+ * @param message The error message
+ * @param cause The original exception
+ */
public NothingWrittenException(WriteRequest request, String message, Throwable cause) {
super(request, message, cause);
}
- public NothingWrittenException(WriteRequest request, String s) {
- super(request, s);
+ /**
+ * Create a new NothingWrittenException instance
+ *
+ * @param request The {@link WriteRequest} that hasn't been written
+ * @param message The error message
+ */
+ public NothingWrittenException(WriteRequest request, String message) {
+ super(request, message);
}
+ /**
+ * Create a new NothingWrittenException instance
+ *
+ * @param request The {@link WriteRequest} that hasn't been written
+ * @param cause The original exception
+ */
public NothingWrittenException(WriteRequest request, Throwable cause) {
super(request, cause);
}
+ /**
+ * Create a new NothingWrittenException instance
+ *
+ * @param request The {@link WriteRequest} that hasn't been written
+ */
public NothingWrittenException(WriteRequest request) {
super(request);
}
http://git-wip-us.apache.org/repos/asf/mina/blob/7c080890/mina-core/src/main/java/org/apache/mina/core/write/WriteException.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/write/WriteException.java b/mina-core/src/main/java/org/apache/mina/core/write/WriteException.java
index 193432b..97acbb4 100644
--- a/mina-core/src/main/java/org/apache/mina/core/write/WriteException.java
+++ b/mina-core/src/main/java/org/apache/mina/core/write/WriteException.java
@@ -155,7 +155,7 @@ public class WriteException extends IOException {
}
// Create a list of requests removing duplicates.
- Set<WriteRequest> newRequests = new MapBackedSet<WriteRequest>(new LinkedHashMap<WriteRequest, Boolean>());
+ Set<WriteRequest> newRequests = new MapBackedSet<>(new LinkedHashMap<WriteRequest, Boolean>());
for (WriteRequest r : requests) {
newRequests.add(r.getOriginalRequest());
@@ -169,7 +169,7 @@ public class WriteException extends IOException {
throw new IllegalArgumentException("request");
}
- List<WriteRequest> requests = new ArrayList<WriteRequest>(1);
+ List<WriteRequest> requests = new ArrayList<>(1);
requests.add(request.getOriginalRequest());
return Collections.unmodifiableList(requests);
http://git-wip-us.apache.org/repos/asf/mina/blob/7c080890/mina-core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java b/mina-core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java
index 0941d43..0abea69 100644
--- a/mina-core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java
+++ b/mina-core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java
@@ -47,6 +47,7 @@ public class WriteRequestWrapper implements WriteRequest {
/**
* {@inheritDoc}
*/
+ @Override
public SocketAddress getDestination() {
return parentRequest.getDestination();
}
@@ -54,6 +55,7 @@ public class WriteRequestWrapper implements WriteRequest {
/**
* {@inheritDoc}
*/
+ @Override
public WriteFuture getFuture() {
return parentRequest.getFuture();
}
@@ -61,6 +63,7 @@ public class WriteRequestWrapper implements WriteRequest {
/**
* {@inheritDoc}
*/
+ @Override
public Object getMessage() {
return parentRequest.getMessage();
}
@@ -68,6 +71,7 @@ public class WriteRequestWrapper implements WriteRequest {
/**
* {@inheritDoc}
*/
+ @Override
public WriteRequest getOriginalRequest() {
return parentRequest.getOriginalRequest();
}
@@ -90,6 +94,7 @@ public class WriteRequestWrapper implements WriteRequest {
/**
* {@inheritDoc}
*/
+ @Override
public boolean isEncoded() {
return false;
}
http://git-wip-us.apache.org/repos/asf/mina/blob/7c080890/mina-core/src/main/java/org/apache/mina/core/write/WriteTimeoutException.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/write/WriteTimeoutException.java b/mina-core/src/main/java/org/apache/mina/core/write/WriteTimeoutException.java
index 9ee214c..3fd3e60 100644
--- a/mina-core/src/main/java/org/apache/mina/core/write/WriteTimeoutException.java
+++ b/mina-core/src/main/java/org/apache/mina/core/write/WriteTimeoutException.java
@@ -32,34 +32,82 @@ import org.apache.mina.core.session.IoSessionConfig;
public class WriteTimeoutException extends WriteException {
private static final long serialVersionUID = 3906931157944579121L;
+ /**
+ * Create a new WriteTimeoutException instance
+ *
+ * @param requests The {@link WriteRequest}s for which we have had a timeout
+ * @param message The error message
+ * @param cause The original exception
+ */
public WriteTimeoutException(Collection<WriteRequest> requests, String message, Throwable cause) {
super(requests, message, cause);
}
- public WriteTimeoutException(Collection<WriteRequest> requests, String s) {
- super(requests, s);
+ /**
+ * Create a new WriteTimeoutException instance
+ *
+ * @param requests The {@link WriteRequest}s for which we have had a timeout
+ * @param message The error message
+ */
+ public WriteTimeoutException(Collection<WriteRequest> requests, String message) {
+ super(requests, message);
}
+ /**
+ * Create a new WriteTimeoutException instance
+ *
+ * @param requests The {@link WriteRequest}s for which we have had a timeout
+ * @param cause The original exception
+ */
public WriteTimeoutException(Collection<WriteRequest> requests, Throwable cause) {
super(requests, cause);
}
+ /**
+ * Create a new WriteTimeoutException instance
+ *
+ * @param requests The {@link WriteRequest}s for which we have had a timeout
+ */
public WriteTimeoutException(Collection<WriteRequest> requests) {
super(requests);
}
+ /**
+ * Create a new WriteTimeoutException instance
+ *
+ * @param request The {@link WriteRequest} for which we have had a timeout
+ * @param message The error message
+ * @param cause The original exception
+ */
public WriteTimeoutException(WriteRequest request, String message, Throwable cause) {
super(request, message, cause);
}
- public WriteTimeoutException(WriteRequest request, String s) {
- super(request, s);
+ /**
+ * Create a new WriteTimeoutException instance
+ *
+ * @param request The {@link WriteRequest} for which we have had a timeout
+ * @param message The error message
+ */
+ public WriteTimeoutException(WriteRequest request, String message) {
+ super(request, message);
}
+ /**
+ * Create a new WriteTimeoutException instance
+ *
+ * @param request The {@link WriteRequest} for which we have had a timeout
+ * @param cause The original exception
+ */
public WriteTimeoutException(WriteRequest request, Throwable cause) {
super(request, cause);
}
+ /**
+ * Create a new WriteTimeoutException instance
+ *
+ * @param request The {@link WriteRequest} for which we have had a timeout
+ */
public WriteTimeoutException(WriteRequest request) {
super(request);
}
http://git-wip-us.apache.org/repos/asf/mina/blob/7c080890/mina-core/src/main/java/org/apache/mina/core/write/WriteToClosedSessionException.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/write/WriteToClosedSessionException.java b/mina-core/src/main/java/org/apache/mina/core/write/WriteToClosedSessionException.java
index 620dbe0..13c240c 100644
--- a/mina-core/src/main/java/org/apache/mina/core/write/WriteToClosedSessionException.java
+++ b/mina-core/src/main/java/org/apache/mina/core/write/WriteToClosedSessionException.java
@@ -31,34 +31,83 @@ public class WriteToClosedSessionException extends WriteException {
private static final long serialVersionUID = 5550204573739301393L;
+ /**
+ * Create a new WriteToClosedSessionException instance
+ *
+ * @param requests The {@link WriteRequest}s which have been written on a closed session
+ * @param message The error message
+ * @param cause The original exception
+ */
public WriteToClosedSessionException(Collection<WriteRequest> requests, String message, Throwable cause) {
super(requests, message, cause);
}
- public WriteToClosedSessionException(Collection<WriteRequest> requests, String s) {
- super(requests, s);
+ /**
+ * Create a new WriteToClosedSessionException instance
+ *
+ * @param requests The {@link WriteRequest}s which have been written on a closed session
+ * @param message The error message
+ */
+ public WriteToClosedSessionException(Collection<WriteRequest> requests, String message) {
+ super(requests, message);
}
+ /**
+ * Create a new WriteToClosedSessionException instance
+ *
+ * @param requests The {@link WriteRequest}s which have been written on a closed session
+ * @param cause The original exception
+ */
public WriteToClosedSessionException(Collection<WriteRequest> requests, Throwable cause) {
super(requests, cause);
}
+ /**
+ * Create a new WriteToClosedSessionException instance
+ *
+ * @param requests The {@link WriteRequest}s which have been written on a closed session
+ */
public WriteToClosedSessionException(Collection<WriteRequest> requests) {
super(requests);
}
+ /**
+ * Create a new WriteToClosedSessionException instance
+ *
+ * @param request The {@link WriteRequest} which has been written on a closed session
+ * @param message The error message
+ * @param cause The original exception
+ */
public WriteToClosedSessionException(WriteRequest request, String message, Throwable cause) {
super(request, message, cause);
}
- public WriteToClosedSessionException(WriteRequest request, String s) {
- super(request, s);
+ /**
+ * Create a new WriteToClosedSessionException instance
+ *
+ * @param request The {@link WriteRequest} which has been written on a closed session
+ * @param message The error message
+ */
+ public WriteToClosedSessionException(WriteRequest request, String message) {
+ super(request, message);
}
+ /**
+ * Create a new WriteToClosedSessionException instance
+ *
+ * @param request The {@link WriteRequest} which has been written on a closed session
+ * @param cause The original exception
+ */
public WriteToClosedSessionException(WriteRequest request, Throwable cause) {
super(request, cause);
}
+ /**
+ * Create a new WriteToClosedSessionException instance
+ *
+ * @param request The {@link WriteRequest} which has been written on a closed session
+
+ */
public WriteToClosedSessionException(WriteRequest request) {
super(request);
}
[2/4] mina git commit: o Added some mising Javadoc o Fixing some
Sonarlint warnings
Posted by el...@apache.org.
o Added some mising Javadoc
o Fixing some Sonarlint warnings
Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/37239fd0
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/37239fd0
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/37239fd0
Branch: refs/heads/2.0
Commit: 37239fd01f0483e74e0bfd78abc5aca86dcb48c4
Parents: 9b26714
Author: Emmanuel L�charny <el...@symas.com>
Authored: Wed Dec 7 11:25:25 2016 +0100
Committer: Emmanuel L�charny <el...@symas.com>
Committed: Wed Dec 7 11:25:25 2016 +0100
----------------------------------------------------------------------
.../socket/nio/NioDatagramAcceptor.java | 26 ++++++---
.../socket/nio/NioDatagramConnector.java | 60 +++++++++++++++++++-
.../socket/nio/NioDatagramSession.java | 19 +++++++
.../socket/nio/NioDatagramSessionConfig.java | 12 +++-
4 files changed, 105 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina/blob/37239fd0/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
index 1577ff2..d4c0000 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
@@ -81,11 +81,11 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
private final Semaphore lock = new Semaphore(1);
/** A queue used to store the list of pending Binds */
- private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
+ private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<>();
- private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
+ private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<>();
- private final Queue<NioSession> flushingSessions = new ConcurrentLinkedQueue<NioSession>();
+ private final Queue<NioSession> flushingSessions = new ConcurrentLinkedQueue<>();
private final Map<SocketAddress, DatagramChannel> boundHandles = Collections
.synchronizedMap(new HashMap<SocketAddress, DatagramChannel>());
@@ -150,6 +150,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
* the registered handles have been removed (unbound).
*/
private class Acceptor implements Runnable {
+ @Override
public void run() {
int nHandles = 0;
lastIdleCheckTime = System.currentTimeMillis();
@@ -220,7 +221,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
break;
}
- Map<SocketAddress, DatagramChannel> newHandles = new HashMap<SocketAddress, DatagramChannel>();
+ Map<SocketAddress, DatagramChannel> newHandles = new HashMap<>();
List<SocketAddress> localAddresses = req.getLocalAddresses();
try {
@@ -494,6 +495,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
/**
* {@inheritDoc}
*/
+ @Override
public void add(NioSession session) {
// Nothing to do for UDP
}
@@ -538,7 +540,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
// Update the local addresses.
// setLocalAddresses() shouldn't be called from the worker thread
// because of deadlock.
- Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
+ Set<SocketAddress> newLocalAddresses = new HashSet<>();
for (DatagramChannel handle : boundHandles.values()) {
newLocalAddresses.add(localAddress(handle));
@@ -577,6 +579,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
/**
* {@inheritDoc}
*/
+ @Override
public void flush(NioSession session) {
if (scheduleFlush(session)) {
wakeup();
@@ -596,14 +599,17 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
/**
* {@inheritDoc}
*/
+ @Override
public DatagramSessionConfig getSessionConfig() {
return (DatagramSessionConfig) sessionConfig;
}
+ @Override
public final IoSessionRecycler getSessionRecycler() {
return sessionRecycler;
}
+ @Override
public TransportMetadata getTransportMetadata() {
return NioDatagramSession.METADATA;
}
@@ -665,6 +671,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
/**
* {@inheritDoc}
*/
+ @Override
public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
if (isDisposing()) {
throw new IllegalStateException("The Acceptor is being disposed.");
@@ -681,9 +688,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
try {
return newSessionWithoutLock(remoteAddress, localAddress);
- } catch (RuntimeException e) {
- throw e;
- } catch (Error e) {
+ } catch (RuntimeException | Error e) {
throw e;
} catch (Exception e) {
throw new RuntimeIoException("Failed to create a session.", e);
@@ -732,6 +737,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
/**
* {@inheritDoc}
*/
+ @Override
public void remove(NioSession session) {
getSessionRecycler().remove(session);
getListeners().fireSessionDestroyed(session);
@@ -753,6 +759,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
return ((DatagramChannel) session.getChannel()).send(buffer.buf(), remoteAddress);
}
+ @Override
public void setDefaultLocalAddress(InetSocketAddress localAddress) {
setDefaultLocalAddress((SocketAddress) localAddress);
}
@@ -775,6 +782,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
key.interestOps(newInterestOps);
}
+ @Override
public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
synchronized (bindLock) {
if (isActive()) {
@@ -810,6 +818,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
/**
* {@inheritDoc}
*/
+ @Override
public void updateTrafficControl(NioSession session) {
throw new UnsupportedOperationException();
}
@@ -821,6 +830,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
/**
* {@inheritDoc}
*/
+ @Override
public void write(NioSession session, WriteRequest writeRequest) {
// We will try to write the message directly
long currentTime = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/mina/blob/37239fd0/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
index 9da09de..c101448 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
@@ -93,35 +93,56 @@ DatagramConnector {
*
* @param processorClass the processor class.
* @see SimpleIoProcessorPool#SimpleIoProcessorPool(Class, Executor, int, java.nio.channels.spi.SelectorProvider)
- * @see org.apache.mina.core.service.SimpleIoProcessorPool#DEFAULT_SIZE
* @since 2.0.0-M4
*/
public NioDatagramConnector(Class<? extends IoProcessor<NioSession>> processorClass) {
super(new DefaultDatagramSessionConfig(), processorClass);
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public TransportMetadata getTransportMetadata() {
return NioDatagramSession.METADATA;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public DatagramSessionConfig getSessionConfig() {
return (DatagramSessionConfig) sessionConfig;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public InetSocketAddress getDefaultRemoteAddress() {
return (InetSocketAddress) super.getDefaultRemoteAddress();
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
super.setDefaultRemoteAddress(defaultRemoteAddress);
}
+ /**
+ @Override
+ * {@inheritDoc}
+ */
@Override
protected void init() throws Exception {
// Do nothing
}
+ /**
+ * {@inheritDoc}
+ */
@Override
protected DatagramChannel newHandle(SocketAddress localAddress) throws Exception {
DatagramChannel ch = DatagramChannel.open();
@@ -155,12 +176,18 @@ DatagramConnector {
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
protected boolean connect(DatagramChannel handle, SocketAddress remoteAddress) throws Exception {
handle.connect(remoteAddress);
return true;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
protected NioSession newSession(IoProcessor<NioSession> processor, DatagramChannel handle) {
NioSession session = new NioDatagramSession(this, handle, processor);
@@ -168,50 +195,77 @@ DatagramConnector {
return session;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
protected void close(DatagramChannel handle) throws Exception {
handle.disconnect();
handle.close();
}
+ /**
+ * {@inheritDoc}
+ */
// Unused extension points.
@Override
@SuppressWarnings("unchecked")
protected Iterator<DatagramChannel> allHandles() {
- return Collections.EMPTY_LIST.iterator();
+ return Collections.emptyIterator();
}
+ /**
+ * {@inheritDoc}
+ */
@Override
protected ConnectionRequest getConnectionRequest(DatagramChannel handle) {
throw new UnsupportedOperationException();
}
+ /**
+ * {@inheritDoc}
+ */
@Override
protected void destroy() throws Exception {
// Do nothing
}
+ /**
+ * {@inheritDoc}
+ */
@Override
protected boolean finishConnect(DatagramChannel handle) throws Exception {
throw new UnsupportedOperationException();
}
+ /**
+ * {@inheritDoc}
+ */
@Override
protected void register(DatagramChannel handle, ConnectionRequest request) throws Exception {
throw new UnsupportedOperationException();
}
+ /**
+ * {@inheritDoc}
+ */
@Override
protected int select(int timeout) throws Exception {
return 0;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
@SuppressWarnings("unchecked")
protected Iterator<DatagramChannel> selectedHandles() {
- return Collections.EMPTY_LIST.iterator();
+ return Collections.emptyIterator();
}
+ /**
+ * {@inheritDoc}
+ */
@Override
protected void wakeup() {
// Do nothing
http://git-wip-us.apache.org/repos/asf/mina/blob/37239fd0/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java
index 9ca2ba3..180ddfc 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java
@@ -66,27 +66,46 @@ class NioDatagramSession extends NioSession {
/**
* {@inheritDoc}
*/
+ @Override
public DatagramSessionConfig getConfig() {
return (DatagramSessionConfig) config;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
DatagramChannel getChannel() {
return (DatagramChannel) channel;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public TransportMetadata getTransportMetadata() {
return METADATA;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public InetSocketAddress getRemoteAddress() {
return remoteAddress;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public InetSocketAddress getLocalAddress() {
return localAddress;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public InetSocketAddress getServiceAddress() {
return (InetSocketAddress) super.getServiceAddress();
http://git-wip-us.apache.org/repos/asf/mina/blob/37239fd0/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSessionConfig.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSessionConfig.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSessionConfig.java
index 552feaf..94df01e 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSessionConfig.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSessionConfig.java
@@ -54,6 +54,7 @@ class NioDatagramSessionConfig extends AbstractDatagramSessionConfig {
*
* @see DatagramSocket#getReceiveBufferSize()
*/
+ @Override
public int getReceiveBufferSize() {
try {
return channel.socket().getReceiveBufferSize();
@@ -72,8 +73,9 @@ class NioDatagramSessionConfig extends AbstractDatagramSessionConfig {
* @throws RuntimeIoException if the socket is closed or if we
* had a SocketException
*
- * @see DatagramSocket#setReceiveBufferSize()
+ * @see DatagramSocket#setReceiveBufferSize(int)
*/
+ @Override
public void setReceiveBufferSize(int receiveBufferSize) {
try {
channel.socket().setReceiveBufferSize(receiveBufferSize);
@@ -89,6 +91,7 @@ class NioDatagramSessionConfig extends AbstractDatagramSessionConfig {
* @throws RuntimeIoException If the socket is closed or if we get an
* {@link SocketException}
*/
+ @Override
public boolean isBroadcast() {
try {
return channel.socket().getBroadcast();
@@ -97,6 +100,7 @@ class NioDatagramSessionConfig extends AbstractDatagramSessionConfig {
}
}
+ @Override
public void setBroadcast(boolean broadcast) {
try {
channel.socket().setBroadcast(broadcast);
@@ -110,6 +114,7 @@ class NioDatagramSessionConfig extends AbstractDatagramSessionConfig {
* @throws RuntimeIoException If the socket is closed or if we get an
* {@link SocketException}
*/
+ @Override
public int getSendBufferSize() {
try {
return channel.socket().getSendBufferSize();
@@ -123,6 +128,7 @@ class NioDatagramSessionConfig extends AbstractDatagramSessionConfig {
* @throws RuntimeIoException If the socket is closed or if we get an
* {@link SocketException}
*/
+ @Override
public void setSendBufferSize(int sendBufferSize) {
try {
channel.socket().setSendBufferSize(sendBufferSize);
@@ -138,6 +144,7 @@ class NioDatagramSessionConfig extends AbstractDatagramSessionConfig {
* @throws RuntimeIoException If the socket is closed or if we get an
* {@link SocketException}
*/
+ @Override
public boolean isReuseAddress() {
try {
return channel.socket().getReuseAddress();
@@ -151,6 +158,7 @@ class NioDatagramSessionConfig extends AbstractDatagramSessionConfig {
* @throws RuntimeIoException If the socket is closed or if we get an
* {@link SocketException}
*/
+ @Override
public void setReuseAddress(boolean reuseAddress) {
try {
channel.socket().setReuseAddress(reuseAddress);
@@ -168,6 +176,7 @@ class NioDatagramSessionConfig extends AbstractDatagramSessionConfig {
* @throws RuntimeIoException If the socket is closed or if we get an
* {@link SocketException}
*/
+ @Override
public int getTrafficClass() {
try {
return channel.socket().getTrafficClass();
@@ -181,6 +190,7 @@ class NioDatagramSessionConfig extends AbstractDatagramSessionConfig {
* @throws RuntimeIoException If the socket is closed or if we get an
* {@link SocketException}
*/
+ @Override
public void setTrafficClass(int trafficClass) {
try {
channel.socket().setTrafficClass(trafficClass);