You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2016/12/07 18:51:20 UTC

[1/4] mina git commit: o Added some missing Javadoc o Fixed some Sonarlint warnings

Repository: mina
Updated Branches:
  refs/heads/2.0 bf0254f34 -> 7c080890b


o Added some missing Javadoc
o Fixed some Sonarlint warnings

Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/9b26714b
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/9b26714b
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/9b26714b

Branch: refs/heads/2.0
Commit: 9b26714b1b59b7f507ddc3dfca941af323062e8d
Parents: bf0254f
Author: Emmanuel L�charny <el...@symas.com>
Authored: Wed Dec 7 11:18:53 2016 +0100
Committer: Emmanuel L�charny <el...@symas.com>
Committed: Wed Dec 7 11:18:53 2016 +0100

----------------------------------------------------------------------
 .../core/polling/AbstractPollingIoAcceptor.java |  180 +--
 .../polling/AbstractPollingIoConnector.java     |  276 +++--
 .../polling/AbstractPollingIoProcessor.java     | 1124 +++++++++---------
 3 files changed, 823 insertions(+), 757 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina/blob/9b26714b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
index 86cc31f..bf1bbf0 100644
--- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
+++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
@@ -65,6 +65,8 @@ import org.apache.mina.util.ExceptionMonitor;
  * by the subclassing implementation.
  * 
  * @see NioSocketAcceptor for a example of implementation
+ * @param <H> The type of IoHandler
+ * @param <S> The type of IoSession
  * 
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
@@ -435,8 +437,12 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H>
      * The loop is stopped when all the bound handlers are unbound.
      */
     private class Acceptor implements Runnable {
+        /**
+         * {@inheritDoc}
+         */
+        @Override
         public void run() {
-            assert (acceptorRef.get() == this);
+            assert acceptorRef.get() == this;
 
             int nHandles = 0;
 
@@ -466,16 +472,16 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H>
                         acceptorRef.set(null);
 
                         if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
-                            assert (acceptorRef.get() != this);
+                            assert acceptorRef.get() != this;
                             break;
                         }
 
                         if (!acceptorRef.compareAndSet(null, this)) {
-                            assert (acceptorRef.get() != this);
+                            assert acceptorRef.get() != this;
                             break;
                         }
 
-                        assert (acceptorRef.get() == this);
+                        assert acceptorRef.get() == this;
                     }
 
                     if (selected > 0) {
@@ -553,106 +559,106 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H>
                 session.getProcessor().add(session);
             }
         }
-    }
 
-    /**
-     * Sets up the socket communications.  Sets items such as:
-     * <p/>
-     * Blocking
-     * Reuse address
-     * Receive buffer size
-     * Bind to listen port
-     * Registers OP_ACCEPT for selector
-     */
-    private int registerHandles() {
-        for (;;) {
-            // The register queue contains the list of services to manage
-            // in this acceptor.
-            AcceptorOperationFuture future = registerQueue.poll();
-
-            if (future == null) {
-                return 0;
-            }
-
-            // We create a temporary map to store the bound handles,
-            // as we may have to remove them all if there is an exception
-            // during the sockets opening.
-            Map<SocketAddress, H> newHandles = new ConcurrentHashMap<>();
-            List<SocketAddress> localAddresses = future.getLocalAddresses();
-
-            try {
-                // Process all the addresses
-                for (SocketAddress a : localAddresses) {
-                    H handle = open(a);
-                    newHandles.put(localAddress(handle), handle);
+        /**
+         * Sets up the socket communications.  Sets items such as:
+         * <p/>
+         * Blocking
+         * Reuse address
+         * Receive buffer size
+         * Bind to listen port
+         * Registers OP_ACCEPT for selector
+         */
+        private int registerHandles() {
+            for (;;) {
+                // The register queue contains the list of services to manage
+                // in this acceptor.
+                AcceptorOperationFuture future = registerQueue.poll();
+
+                if (future == null) {
+                    return 0;
                 }
 
-                // Everything went ok, we can now update the map storing
-                // all the bound sockets.
-                boundHandles.putAll(newHandles);
+                // We create a temporary map to store the bound handles,
+                // as we may have to remove them all if there is an exception
+                // during the sockets opening.
+                Map<SocketAddress, H> newHandles = new ConcurrentHashMap<>();
+                List<SocketAddress> localAddresses = future.getLocalAddresses();
 
-                // and notify.
-                future.setDone();
-                
-                return newHandles.size();
-            } catch (Exception e) {
-                // We store the exception in the future
-                future.setException(e);
-            } finally {
-                // Roll back if failed to bind all addresses.
-                if (future.getException() != null) {
-                    for (H handle : newHandles.values()) {
-                        try {
-                            close(handle);
-                        } catch (Exception e) {
-                            ExceptionMonitor.getInstance().exceptionCaught(e);
-                        }
+                try {
+                    // Process all the addresses
+                    for (SocketAddress a : localAddresses) {
+                        H handle = open(a);
+                        newHandles.put(localAddress(handle), handle);
                     }
 
-                    // Wake up the selector to be sure we will process the newly bound handle
-                    // and not block forever in the select()
-                    wakeup();
+                    // Everything went ok, we can now update the map storing
+                    // all the bound sockets.
+                    boundHandles.putAll(newHandles);
+
+                    // and notify.
+                    future.setDone();
+                    
+                    return newHandles.size();
+                } catch (Exception e) {
+                    // We store the exception in the future
+                    future.setException(e);
+                } finally {
+                    // Roll back if failed to bind all addresses.
+                    if (future.getException() != null) {
+                        for (H handle : newHandles.values()) {
+                            try {
+                                close(handle);
+                            } catch (Exception e) {
+                                ExceptionMonitor.getInstance().exceptionCaught(e);
+                            }
+                        }
+
+                        // Wake up the selector to be sure we will process the newly bound handle
+                        // and not block forever in the select()
+                        wakeup();
+                    }
                 }
             }
         }
-    }
 
-    /**
-     * This method just checks to see if anything has been placed into the
-     * cancellation queue.  The only thing that should be in the cancelQueue
-     * is CancellationRequest objects and the only place this happens is in
-     * the doUnbind() method.
-     */
-    private int unregisterHandles() {
-        int cancelledHandles = 0;
-        for (;;) {
-            AcceptorOperationFuture future = cancelQueue.poll();
-            if (future == null) {
-                break;
-            }
+        /**
+         * This method just checks to see if anything has been placed into the
+         * cancellation queue.  The only thing that should be in the cancelQueue
+         * is CancellationRequest objects and the only place this happens is in
+         * the doUnbind() method.
+         */
+        private int unregisterHandles() {
+            int cancelledHandles = 0;
+            for (;;) {
+                AcceptorOperationFuture future = cancelQueue.poll();
+                if (future == null) {
+                    break;
+                }
 
-            // close the channels
-            for (SocketAddress a : future.getLocalAddresses()) {
-                H handle = boundHandles.remove(a);
+                // close the channels
+                for (SocketAddress a : future.getLocalAddresses()) {
+                    H handle = boundHandles.remove(a);
 
-                if (handle == null) {
-                    continue;
-                }
+                    if (handle == null) {
+                        continue;
+                    }
 
-                try {
-                    close(handle);
-                    wakeup(); // wake up again to trigger thread death
-                } catch (Exception e) {
-                    ExceptionMonitor.getInstance().exceptionCaught(e);
-                } finally {
-                    cancelledHandles++;
+                    try {
+                        close(handle);
+                        wakeup(); // wake up again to trigger thread death
+                    } catch (Exception e) {
+                        ExceptionMonitor.getInstance().exceptionCaught(e);
+                    } finally {
+                        cancelledHandles++;
+                    }
                 }
+
+                future.setDone();
             }
 
-            future.setDone();
+            return cancelledHandles;
         }
-
-        return cancelledHandles;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/mina/blob/9b26714b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoConnector.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoConnector.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoConnector.java
index ad68174..32a3956 100644
--- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoConnector.java
+++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoConnector.java
@@ -59,16 +59,18 @@ import org.apache.mina.util.ExceptionMonitor;
  * provided by the subclassing implementation.
  * 
  * @see NioSocketConnector for a example of implementation
+ * @param <H> The type of IoHandler
+ * @param <S> The type of IoSession
  * 
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
-public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H> extends AbstractIoConnector {
+public abstract class AbstractPollingIoConnector<S extends AbstractIoSession, H> extends AbstractIoConnector {
 
-    private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
+    private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<>();
 
-    private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
+    private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<>();
 
-    private final IoProcessor<T> processor;
+    private final IoProcessor<S> processor;
 
     private final boolean createdProcessor;
 
@@ -77,7 +79,7 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
     private volatile boolean selectable;
 
     /** The connector thread */
-    private final AtomicReference<Connector> connectorRef = new AtomicReference<Connector>();
+    private final AtomicReference<Connector> connectorRef = new AtomicReference<>();
 
     /**
      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a
@@ -93,8 +95,8 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
      *            a {@link Class}�of {@link IoProcessor} for the associated
      *            {@link IoSession} type.
      */
-    protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) {
-        this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
+    protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
+        this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true);
     }
 
     /**
@@ -113,9 +115,9 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
      * @param processorCount
      *            the amount of processor to instantiate for the pool
      */
-    protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass,
+    protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
             int processorCount) {
-        this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true);
+        this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount), true);
     }
 
     /**
@@ -133,7 +135,7 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
      *            {@link IoHandler} and processing the chains of
      *            {@link IoFilter}
      */
-    protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, IoProcessor<T> processor) {
+    protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, IoProcessor<S> processor) {
         this(sessionConfig, null, processor, false);
     }
 
@@ -156,7 +158,7 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
      *            {@link IoHandler} and processing the chains of
      *            {@link IoFilter}
      */
-    protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor) {
+    protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor) {
         this(sessionConfig, executor, processor, false);
     }
 
@@ -182,7 +184,7 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
      *            tagging the processor as automatically created, so it will be
      *            automatically disposed
      */
-    private AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor,
+    private AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
             boolean createdProcessor) {
         super(sessionConfig, executor);
 
@@ -279,7 +281,7 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
      * @throws Exception
      *             any exception thrown by the underlying systems calls
      */
-    protected abstract T newSession(IoProcessor<T> processor, H handle) throws Exception;
+    protected abstract S newSession(IoProcessor<S> processor, H handle) throws Exception;
 
     /**
      * Close a client socket.
@@ -367,7 +369,7 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
             handle = newHandle(localAddress);
             if (connect(handle, remoteAddress)) {
                 ConnectFuture future = new DefaultConnectFuture();
-                T session = newSession(processor, handle);
+                S session = newSession(processor, handle);
                 initSession(session, future, sessionInitializer);
                 // Forward the remaining process to the IoProcessor.
                 session.getProcessor().add(session);
@@ -413,116 +415,13 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
         }
     }
 
-    private int registerNew() {
-        int nHandles = 0;
-        for (;;) {
-            ConnectionRequest req = connectQueue.poll();
-            if (req == null) {
-                break;
-            }
-
-            H handle = req.handle;
-            try {
-                register(handle, req);
-                nHandles++;
-            } catch (Exception e) {
-                req.setException(e);
-                try {
-                    close(handle);
-                } catch (Exception e2) {
-                    ExceptionMonitor.getInstance().exceptionCaught(e2);
-                }
-            }
-        }
-        return nHandles;
-    }
-
-    private int cancelKeys() {
-        int nHandles = 0;
-
-        for (;;) {
-            ConnectionRequest req = cancelQueue.poll();
-
-            if (req == null) {
-                break;
-            }
-
-            H handle = req.handle;
-
-            try {
-                close(handle);
-            } catch (Exception e) {
-                ExceptionMonitor.getInstance().exceptionCaught(e);
-            } finally {
-                nHandles++;
-            }
-        }
-
-        if (nHandles > 0) {
-            wakeup();
-        }
-
-        return nHandles;
-    }
-
-    /**
-     * Process the incoming connections, creating a new session for each valid
-     * connection.
-     */
-    private int processConnections(Iterator<H> handlers) {
-        int nHandles = 0;
-
-        // Loop on each connection request
-        while (handlers.hasNext()) {
-            H handle = handlers.next();
-            handlers.remove();
-
-            ConnectionRequest connectionRequest = getConnectionRequest(handle);
-
-            if (connectionRequest == null) {
-                continue;
-            }
-
-            boolean success = false;
-            try {
-                if (finishConnect(handle)) {
-                    T session = newSession(processor, handle);
-                    initSession(session, connectionRequest, connectionRequest.getSessionInitializer());
-                    // Forward the remaining process to the IoProcessor.
-                    session.getProcessor().add(session);
-                    nHandles++;
-                }
-                success = true;
-            } catch (Exception e) {
-                connectionRequest.setException(e);
-            } finally {
-                if (!success) {
-                    // The connection failed, we have to cancel it.
-                    cancelQueue.offer(connectionRequest);
-                }
-            }
-        }
-        return nHandles;
-    }
-
-    private void processTimedOutSessions(Iterator<H> handles) {
-        long currentTime = System.currentTimeMillis();
-
-        while (handles.hasNext()) {
-            H handle = handles.next();
-            ConnectionRequest connectionRequest = getConnectionRequest(handle);
-
-            if ((connectionRequest != null) && (currentTime >= connectionRequest.deadline)) {
-                connectionRequest.setException(new ConnectException("Connection timed out."));
-                cancelQueue.offer(connectionRequest);
-            }
-        }
-    }
-
     private class Connector implements Runnable {
-
+        /**
+         * {@inheritDoc}
+         */
+        @Override
         public void run() {
-            assert (connectorRef.get() == this);
+            assert connectorRef.get() == this;
 
             int nHandles = 0;
 
@@ -541,16 +440,16 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
                         connectorRef.set(null);
 
                         if (connectQueue.isEmpty()) {
-                            assert (connectorRef.get() != this);
+                            assert connectorRef.get() != this;
                             break;
                         }
 
                         if (!connectorRef.compareAndSet(null, this)) {
-                            assert (connectorRef.get() != this);
+                            assert connectorRef.get() != this;
                             break;
                         }
 
-                        assert (connectorRef.get() == this);
+                        assert connectorRef.get() == this;
                     }
 
                     if (selected > 0) {
@@ -596,8 +495,117 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
                 }
             }
         }
+
+        private int registerNew() {
+            int nHandles = 0;
+            for (;;) {
+                ConnectionRequest req = connectQueue.poll();
+                if (req == null) {
+                    break;
+                }
+
+                H handle = req.handle;
+                try {
+                    register(handle, req);
+                    nHandles++;
+                } catch (Exception e) {
+                    req.setException(e);
+                    try {
+                        close(handle);
+                    } catch (Exception e2) {
+                        ExceptionMonitor.getInstance().exceptionCaught(e2);
+                    }
+                }
+            }
+            return nHandles;
+        }
+
+        private int cancelKeys() {
+            int nHandles = 0;
+
+            for (;;) {
+                ConnectionRequest req = cancelQueue.poll();
+
+                if (req == null) {
+                    break;
+                }
+
+                H handle = req.handle;
+
+                try {
+                    close(handle);
+                } catch (Exception e) {
+                    ExceptionMonitor.getInstance().exceptionCaught(e);
+                } finally {
+                    nHandles++;
+                }
+            }
+
+            if (nHandles > 0) {
+                wakeup();
+            }
+
+            return nHandles;
+        }
+
+        /**
+         * Process the incoming connections, creating a new session for each valid
+         * connection.
+         */
+        private int processConnections(Iterator<H> handlers) {
+            int nHandles = 0;
+
+            // Loop on each connection request
+            while (handlers.hasNext()) {
+                H handle = handlers.next();
+                handlers.remove();
+
+                ConnectionRequest connectionRequest = getConnectionRequest(handle);
+
+                if (connectionRequest == null) {
+                    continue;
+                }
+
+                boolean success = false;
+                try {
+                    if (finishConnect(handle)) {
+                        S session = newSession(processor, handle);
+                        initSession(session, connectionRequest, connectionRequest.getSessionInitializer());
+                        // Forward the remaining process to the IoProcessor.
+                        session.getProcessor().add(session);
+                        nHandles++;
+                    }
+                    success = true;
+                } catch (Exception e) {
+                    connectionRequest.setException(e);
+                } finally {
+                    if (!success) {
+                        // The connection failed, we have to cancel it.
+                        cancelQueue.offer(connectionRequest);
+                    }
+                }
+            }
+            return nHandles;
+        }
+
+        private void processTimedOutSessions(Iterator<H> handles) {
+            long currentTime = System.currentTimeMillis();
+
+            while (handles.hasNext()) {
+                H handle = handles.next();
+                ConnectionRequest connectionRequest = getConnectionRequest(handle);
+
+                if ((connectionRequest != null) && (currentTime >= connectionRequest.deadline)) {
+                    connectionRequest.setException(new ConnectException("Connection timed out."));
+                    cancelQueue.offer(connectionRequest);
+                }
+            }
+        }
     }
 
+    /**
+     * A ConnectionRequest's Iouture 
+     */
     public final class ConnectionRequest extends DefaultConnectFuture {
         /** The handle associated with this connection request */
         private final H handle;
@@ -608,6 +616,12 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
         /** The callback to call when the session is initialized */
         private final IoSessionInitializer<? extends ConnectFuture> sessionInitializer;
 
+        /**
+         * Creates a new ConnectionRequest instance
+         * 
+         * @param handle The IoHander
+         * @param callback The IoFuture callback
+         */
         public ConnectionRequest(H handle, IoSessionInitializer<? extends ConnectFuture> callback) {
             this.handle = handle;
             long timeout = getConnectTimeoutMillis();
@@ -621,18 +635,30 @@ public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
             this.sessionInitializer = callback;
         }
 
+        /**
+         * @return The IoHandler instance
+         */
         public H getHandle() {
             return handle;
         }
 
+        /**
+         * @return The connection deadline 
+         */
         public long getDeadline() {
             return deadline;
         }
 
+        /**
+         * @return The session initializer callback
+         */
         public IoSessionInitializer<? extends ConnectFuture> getSessionInitializer() {
             return sessionInitializer;
         }
 
+        /**
+         * {@inheritDoc}
+         */
         @Override
         public boolean cancel() {
             if (!isDone()) {

http://git-wip-us.apache.org/repos/asf/mina/blob/9b26714b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
index 853b8a3..48794e6 100644
--- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
+++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
@@ -223,7 +223,8 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
      * Say if the list of {@link IoSession} polled by this {@link IoProcessor}
      * is empty
      * 
-     * @return <tt>true</tt> if at least a session is managed by this {@link IoProcessor}
+     * @return <tt>true</tt> if at least a session is managed by this
+     *         {@link IoProcessor}
      */
     protected abstract boolean isSelectorEmpty();
 
@@ -251,16 +252,17 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
     /**
      * Get the state of a session (One of OPENING, OPEN, CLOSING)
      * 
-     * @param session the {@link IoSession} to inspect
+     * @param session
+     *            the {@link IoSession} to inspect
      * @return the state of the session
      */
     protected abstract SessionState getState(S session);
-    
-    
+
     /**
      * Tells if the session ready for writing
      * 
-     * @param session the queried session
+     * @param session
+     *            the queried session
      * @return <tt>true</tt> is ready, <tt>false</tt> if not ready
      */
     protected abstract boolean isWritable(S session);
@@ -268,7 +270,8 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
     /**
      * Tells if the session ready for reading
      * 
-     * @param session the queried session
+     * @param session
+     *            the queried session
      * @return <tt>true</tt> is ready, <tt>false</tt> if not ready
      */
     protected abstract boolean isReadable(S session);
@@ -276,25 +279,32 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
     /**
      * Set the session to be informed when a write event should be processed
      * 
-     * @param session the session for which we want to be interested in write events
-     * @param isInterested <tt>true</tt> for registering, <tt>false</tt> for removing
-     * @throws Exception If there was a problem while registering the session 
+     * @param session
+     *            the session for which we want to be interested in write events
+     * @param isInterested
+     *            <tt>true</tt> for registering, <tt>false</tt> for removing
+     * @throws Exception
+     *             If there was a problem while registering the session
      */
     protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception;
 
     /**
      * Set the session to be informed when a read event should be processed
      * 
-     * @param session the session for which we want to be interested in read events
-     * @param isInterested <tt>true</tt> for registering, <tt>false</tt> for removing
-     * @throws Exception If there was a problem while registering the session 
+     * @param session
+     *            the session for which we want to be interested in read events
+     * @param isInterested
+     *            <tt>true</tt> for registering, <tt>false</tt> for removing
+     * @throws Exception
+     *             If there was a problem while registering the session
      */
     protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception;
 
     /**
      * Tells if this session is registered for reading
      * 
-     * @param session the queried session
+     * @param session
+     *            the queried session
      * @return <tt>true</tt> is registered for reading
      */
     protected abstract boolean isInterestedInRead(S session);
@@ -302,7 +312,8 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
     /**
      * Tells if this session is registered for writing
      * 
-     * @param session the queried session
+     * @param session
+     *            the queried session
      * @return <tt>true</tt> is registered for writing
      */
     protected abstract boolean isInterestedInWrite(S session);
@@ -310,16 +321,20 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
     /**
      * Initialize the polling of a session. Add it to the polling process.
      * 
-     * @param session the {@link IoSession} to add to the polling
-     * @throws Exception any exception thrown by the underlying system calls
+     * @param session
+     *            the {@link IoSession} to add to the polling
+     * @throws Exception
+     *             any exception thrown by the underlying system calls
      */
     protected abstract void init(S session) throws Exception;
 
     /**
      * Destroy the underlying client socket handle
      * 
-     * @param session the {@link IoSession}
-     * @throws Exception any exception thrown by the underlying system calls
+     * @param session
+     *            the {@link IoSession}
+     * @throws Exception
+     *             any exception thrown by the underlying system calls
      */
     protected abstract void destroy(S session) throws Exception;
 
@@ -327,10 +342,13 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
      * Reads a sequence of bytes from a {@link IoSession} into the given
      * {@link IoBuffer}. Is called when the session was found ready for reading.
      * 
-     * @param session the session to read
-     * @param buf the buffer to fill
+     * @param session
+     *            the session to read
+     * @param buf
+     *            the buffer to fill
      * @return the number of bytes read
-     * @throws Exception any exception thrown by the underlying system calls
+     * @throws Exception
+     *             any exception thrown by the underlying system calls
      */
     protected abstract int read(S session, IoBuffer buf) throws Exception;
 
@@ -338,12 +356,16 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
      * Write a sequence of bytes to a {@link IoSession}, means to be called when
      * a session was found ready for writing.
      * 
-     * @param session the session to write
-     * @param buf the buffer to write
-     * @param length the number of bytes to write can be superior to the number of
+     * @param session
+     *            the session to write
+     * @param buf
+     *            the buffer to write
+     * @param length
+     *            the number of bytes to write can be superior to the number of
      *            bytes remaining in the buffer
      * @return the number of byte written
-     * @throws IOException any exception thrown by the underlying system calls
+     * @throws IOException
+     *             any exception thrown by the underlying system calls
      */
     protected abstract int write(S session, IoBuffer buf, int length) throws IOException;
 
@@ -353,11 +375,15 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
      * {@link UnsupportedOperationException} so the file will be send using
      * usual {@link #write(AbstractIoSession, IoBuffer, int)} call.
      * 
-     * @param session the session to write
-     * @param region the file region to write
-     * @param length the length of the portion to send
+     * @param session
+     *            the session to write
+     * @param region
+     *            the file region to write
+     * @param length
+     *            the length of the portion to send
      * @return the number of written bytes
-     * @throws Exception any exception thrown by the underlying system calls
+     * @throws Exception
+     *             any exception thrown by the underlying system calls
      */
     protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;
 
@@ -417,18 +443,11 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
         }
     }
 
-    private void scheduleFlush(S session) {
-        // add the session to the queue if it's not already
-        // in the queue
-        if (session.setScheduledForFlush(true)) {
-            flushingSessions.add(session);
-        }
-    }
-
     /**
      * Updates the traffic mask for a given session
      * 
-     * @param session the session to update
+     * @param session
+     *            the session to update
      */
     public final void updateTrafficMask(S session) {
         trafficControllingSessions.add(session);
@@ -460,7 +479,8 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
      * trash the buggy selector and create a new one, registring all the sockets
      * on it.
      * 
-     * @throws IOException If we got an exception
+     * @throws IOException
+     *             If we got an exception
      */
     protected abstract void registerNewSelector() throws IOException;
 
@@ -470,202 +490,11 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
      * have to loop.
      * 
      * @return <tt>true</tt> if a connection has been brutally closed.
-     * @throws IOException If we got an exception
+     * @throws IOException
+     *             If we got an exception
      */
     protected abstract boolean isBrokenConnection() throws IOException;
 
-    /**
-     * Loops over the new sessions blocking queue and returns the number of
-     * sessions which are effectively created
-     * 
-     * @return The number of new sessions
-     */
-    private int handleNewSessions() {
-        int addedSessions = 0;
-
-        for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
-            if (addNow(session)) {
-                // A new session has been created
-                addedSessions++;
-            }
-        }
-
-        return addedSessions;
-    }
-
-    /**
-     * Process a new session : - initialize it - create its chain - fire the
-     * CREATED listeners if any
-     * 
-     * @param session The session to create
-     * @return <tt>true</tt> if the session has been registered
-     */
-    private boolean addNow(S session) {
-        boolean registered = false;
-
-        try {
-            init(session);
-            registered = true;
-
-            // Build the filter chain of this session.
-            IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
-            chainBuilder.buildFilterChain(session.getFilterChain());
-
-            // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
-            // in AbstractIoFilterChain.fireSessionOpened().
-            // Propagate the SESSION_CREATED event up to the chain
-            IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
-            listeners.fireSessionCreated(session);
-        } catch (Exception e) {
-            ExceptionMonitor.getInstance().exceptionCaught(e);
-
-            try {
-                destroy(session);
-            } catch (Exception e1) {
-                ExceptionMonitor.getInstance().exceptionCaught(e1);
-            } finally {
-                registered = false;
-            }
-        }
-
-        return registered;
-    }
-
-    private int removeSessions() {
-        int removedSessions = 0;
-
-        for (S session = removingSessions.poll(); session != null;session = removingSessions.poll()) {
-            SessionState state = getState(session);
-
-            // Now deal with the removal accordingly to the session's state
-            switch (state) {
-                case OPENED:
-                    // Try to remove this session
-                    if (removeNow(session)) {
-                        removedSessions++;
-                    }
-                    
-                    break;
-    
-                case CLOSING:
-                    // Skip if channel is already closed
-                    // In any case, remove the session from the queue
-                    removedSessions++;
-                    break;
-    
-                case OPENING:
-                    // Remove session from the newSessions queue and
-                    // remove it
-                    newSessions.remove(session);
-    
-                    if (removeNow(session)) {
-                        removedSessions++;
-                    }
-    
-                    break;
-    
-                default:
-                    throw new IllegalStateException(String.valueOf(state));
-            }
-        }
-
-        return removedSessions;
-    }
-
-    private boolean removeNow(S session) {
-        clearWriteRequestQueue(session);
-
-        try {
-            destroy(session);
-            return true;
-        } catch (Exception e) {
-            IoFilterChain filterChain = session.getFilterChain();
-            filterChain.fireExceptionCaught(e);
-        } finally {
-            try {
-                clearWriteRequestQueue(session);
-                ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
-            } catch (Exception e) {
-                // The session was either destroyed or not at this point.
-                // We do not want any exception thrown from this "cleanup" code to change
-                // the return value by bubbling up.
-                IoFilterChain filterChain = session.getFilterChain();
-                filterChain.fireExceptionCaught(e);
-            }
-        }
-        
-        return false;
-    }
-
-    private void clearWriteRequestQueue(S session) {
-        WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
-        WriteRequest req;
-
-        List<WriteRequest> failedRequests = new ArrayList<>();
-
-        if ((req = writeRequestQueue.poll(session)) != null) {
-            Object message = req.getMessage();
-
-            if (message instanceof IoBuffer) {
-                IoBuffer buf = (IoBuffer) message;
-
-                // The first unwritten empty buffer must be
-                // forwarded to the filter chain.
-                if (buf.hasRemaining()) {
-                    buf.reset();
-                    failedRequests.add(req);
-                } else {
-                    IoFilterChain filterChain = session.getFilterChain();
-                    filterChain.fireMessageSent(req);
-                }
-            } else {
-                failedRequests.add(req);
-            }
-
-            // Discard others.
-            while ((req = writeRequestQueue.poll(session)) != null) {
-                failedRequests.add(req);
-            }
-        }
-
-        // Create an exception and notify.
-        if (!failedRequests.isEmpty()) {
-            WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
-
-            for (WriteRequest r : failedRequests) {
-                session.decreaseScheduledBytesAndMessages(r);
-                r.getFuture().setException(cause);
-            }
-
-            IoFilterChain filterChain = session.getFilterChain();
-            filterChain.fireExceptionCaught(cause);
-        }
-    }
-
-    private void process() throws Exception {
-        for (Iterator<S> i = selectedSessions(); i.hasNext();) {
-            S session = i.next();
-            process(session);
-            i.remove();
-        }
-    }
-
-    /**
-     * Deal with session ready for the read or write operations, or both.
-     */
-    private void process(S session) {
-        // Process Reads
-        if (isReadable(session) && !session.isReadSuspended()) {
-            read(session);
-        }
-
-        // Process writes
-        if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) {
-            // add the session to the queue, if it's not already there
-            flushingSessions.add(session);
-        }
-    }
-
     private void read(S session) {
         IoSessionConfig config = session.getConfig();
         int bufferSize = config.getReadBufferSize();
@@ -717,12 +546,11 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
                 filterChain.fireInputClosed();
             }
         } catch (Exception e) {
-            if (e instanceof IOException) {
-                if (!(e instanceof PortUnreachableException)
+            if ((e instanceof IOException) &&
+                (!(e instanceof PortUnreachableException)
                         || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
-                        || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
-                    scheduleRemove(session);
-                }
+                        || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable())) {
+                scheduleRemove(session);
             }
 
             IoFilterChain filterChain = session.getFilterChain();
@@ -730,306 +558,6 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
         }
     }
 
-    private void notifyIdleSessions(long currentTime) throws Exception {
-        // process idle sessions
-        if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
-            lastIdleCheckTime = currentTime;
-            AbstractIoSession.notifyIdleness(allSessions(), currentTime);
-        }
-    }
-
-    /**
-     * Write all the pending messages
-     */
-    private void flush(long currentTime) {
-        if (flushingSessions.isEmpty()) {
-            return;
-        }
-
-        do {
-            S session = flushingSessions.poll(); // the same one with
-                                                 // firstSession
-
-            if (session == null) {
-                // Just in case ... It should not happen.
-                break;
-            }
-
-            // Reset the Schedule for flush flag for this session,
-            // as we are flushing it now
-            session.unscheduledForFlush();
-
-            SessionState state = getState(session);
-
-            switch (state) {
-                case OPENED:
-                    try {
-                        boolean flushedAll = flushNow(session, currentTime);
-    
-                        if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
-                                && !session.isScheduledForFlush()) {
-                            scheduleFlush(session);
-                        }
-                    } catch (Exception e) {
-                        scheduleRemove(session);
-                        session.closeNow();
-                        IoFilterChain filterChain = session.getFilterChain();
-                        filterChain.fireExceptionCaught(e);
-                    }
-    
-                    break;
-    
-                case CLOSING:
-                    // Skip if the channel is already closed.
-                    break;
-    
-                case OPENING:
-                    // Retry later if session is not yet fully initialized.
-                    // (In case that Session.write() is called before addSession()
-                    // is processed)
-                    scheduleFlush(session);
-                    return;
-    
-                default:
-                    throw new IllegalStateException(String.valueOf(state));
-            }
-
-        } while (!flushingSessions.isEmpty());
-    }
-
-    private boolean flushNow(S session, long currentTime) {
-        if (!session.isConnected()) {
-            scheduleRemove(session);
-            return false;
-        }
-
-        final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
-
-        final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
-
-        // Set limitation for the number of written bytes for read-write
-        // fairness. I used maxReadBufferSize * 3 / 2, which yields best
-        // performance in my experience while not breaking fairness much.
-        final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
-                + (session.getConfig().getMaxReadBufferSize() >>> 1);
-        int writtenBytes = 0;
-        WriteRequest req = null;
-
-        try {
-            // Clear OP_WRITE
-            setInterestedInWrite(session, false);
-
-            do {
-                // Check for pending writes.
-                req = session.getCurrentWriteRequest();
-
-                if (req == null) {
-                    req = writeRequestQueue.poll(session);
-
-                    if (req == null) {
-                        break;
-                    }
-
-                    session.setCurrentWriteRequest(req);
-                }
-
-                int localWrittenBytes;
-                Object message = req.getMessage();
-
-                if (message instanceof IoBuffer) {
-                    localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
-                            currentTime);
-
-                    if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
-                        // the buffer isn't empty, we re-interest it in writing
-                        writtenBytes += localWrittenBytes;
-                        setInterestedInWrite(session, true);
-                        return false;
-                    }
-                } else if (message instanceof FileRegion) {
-                    localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
-                            currentTime);
-
-                    // Fix for Java bug on Linux
-                    // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
-                    // If there's still data to be written in the FileRegion,
-                    // return 0 indicating that we need
-                    // to pause until writing may resume.
-                    if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
-                        writtenBytes += localWrittenBytes;
-                        setInterestedInWrite(session, true);
-                        return false;
-                    }
-                } else {
-                    throw new IllegalStateException("Don't know how to handle message of type '"
-                            + message.getClass().getName() + "'.  Are you missing a protocol encoder?");
-                }
-
-                if (localWrittenBytes == 0) {
-
-                    // Kernel buffer is full.
-                    if (!req.equals(AbstractIoSession.MESSAGE_SENT_REQUEST)) {
-                        setInterestedInWrite(session, true);
-                        return false;
-                    }
-                } else {
-                    writtenBytes += localWrittenBytes;
-    
-                    if (writtenBytes >= maxWrittenBytes) {
-                        // Wrote too much
-                        scheduleFlush(session);
-                        return false;
-                    }
-                }
-
-                if (message instanceof IoBuffer) {
-                    ((IoBuffer) message).free();
-                }
-            } while (writtenBytes < maxWrittenBytes);
-        } catch (Exception e) {
-            if (req != null) {
-                req.getFuture().setException(e);
-            }
-
-            IoFilterChain filterChain = session.getFilterChain();
-            filterChain.fireExceptionCaught(e);
-            return false;
-        }
-
-        return true;
-    }
-
-    private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
-            throws Exception {
-        IoBuffer buf = (IoBuffer) req.getMessage();
-        int localWrittenBytes = 0;
-
-        if (buf.hasRemaining()) {
-            int length;
-
-            if (hasFragmentation) {
-                length = Math.min(buf.remaining(), maxLength);
-            } else {
-                length = buf.remaining();
-            }
-
-            try {
-                localWrittenBytes = write(session, buf, length);
-            } catch (IOException ioe) {
-                // We have had an issue while trying to send data to the
-                // peer : let's close the session.
-                buf.free();
-                session.closeNow();
-                removeNow(session);
-
-                return 0;
-            }
-        }
-
-        session.increaseWrittenBytes(localWrittenBytes, currentTime);
-        
-        // Now, forward the original message
-        if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
-            // Buffer has been sent, clear the current request.
-            Object originalMessage = req.getOriginalRequest().getMessage();
-
-            if (originalMessage instanceof IoBuffer) {
-                buf = ((IoBuffer)req.getOriginalRequest().getMessage());
-
-                int pos = buf.position();
-                buf.reset();
-                fireMessageSent(session, req);
-                // And set it back to its position
-                buf.position(pos);
-            } else {
-                fireMessageSent(session, req);
-            }
-        }
-
-        return localWrittenBytes;
-    }
-
-    private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
-            throws Exception {
-        int localWrittenBytes;
-        FileRegion region = (FileRegion) req.getMessage();
-
-        if (region.getRemainingBytes() > 0) {
-            int length;
-
-            if (hasFragmentation) {
-                length = (int) Math.min(region.getRemainingBytes(), maxLength);
-            } else {
-                length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
-            }
-
-            localWrittenBytes = transferFile(session, region, length);
-            region.update(localWrittenBytes);
-        } else {
-            localWrittenBytes = 0;
-        }
-
-        session.increaseWrittenBytes(localWrittenBytes, currentTime);
-
-        if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) {
-            fireMessageSent(session, req);
-        }
-
-        return localWrittenBytes;
-    }
-
-    private void fireMessageSent(S session, WriteRequest req) {
-        session.setCurrentWriteRequest(null);
-        IoFilterChain filterChain = session.getFilterChain();
-        filterChain.fireMessageSent(req);
-    }
-
-    /**
-     * Update the trafficControl for all the session.
-     */
-    private void updateTrafficMask() {
-        int queueSize = trafficControllingSessions.size();
-
-        while (queueSize > 0) {
-            S session = trafficControllingSessions.poll();
-
-            if (session == null) {
-                // We are done with this queue.
-                return;
-            }
-
-            SessionState state = getState(session);
-
-            switch (state) {
-            case OPENED:
-                updateTrafficControl(session);
-
-                break;
-
-            case CLOSING:
-                break;
-
-            case OPENING:
-                // Retry later if session is not yet fully initialized.
-                // (In case that Session.suspend??() or session.resume??() is
-                // called before addSession() is processed)
-                // We just put back the session at the end of the queue.
-                trafficControllingSessions.add(session);
-                break;
-
-            default:
-                throw new IllegalStateException(String.valueOf(state));
-            }
-
-            // As we have handled one session, decrement the number of
-            // remaining sessions. The OPENING session will be processed
-            // with the next select(), as the queue size has been decreased,
-            // even
-            // if the session has been pushed at the end of the queue
-            queueSize--;
-        }
-    }
-
     /**
      * {@inheritDoc}
      */
@@ -1058,8 +586,12 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
      * sessions -
      */
     private class Processor implements Runnable {
+        /**
+         * {@inheritDoc}
+         */
+        @Override
         public void run() {
-            assert (processorRef.get() == this);
+            assert processorRef.get() == this;
 
             int nSessions = 0;
             lastIdleCheckTime = System.currentTimeMillis();
@@ -1137,31 +669,31 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
 
                         if (newSessions.isEmpty() && isSelectorEmpty()) {
                             // newSessions.add() precedes startupProcessor
-                            assert (processorRef.get() != this);
+                            assert processorRef.get() != this;
                             break;
                         }
 
-                        assert (processorRef.get() != this);
+                        assert processorRef.get() != this;
 
                         if (!processorRef.compareAndSet(null, this)) {
                             // startupProcessor won race, so must exit processor
-                            assert (processorRef.get() != this);
+                            assert processorRef.get() != this;
                             break;
                         }
 
-                        assert (processorRef.get() == this);
+                        assert processorRef.get() == this;
                     }
 
                     // Disconnect all sessions immediately if disposal has been
                     // requested so that we exit this loop eventually.
                     if (isDisposing()) {
                         boolean hasKeys = false;
-                        
+
                         for (Iterator<S> i = allSessions(); i.hasNext();) {
                             IoSession session = i.next();
-                            
+
                             if (session.isActive()) {
-                                scheduleRemove((S)session);
+                                scheduleRemove((S) session);
                                 hasKeys = true;
                             }
                         }
@@ -1198,5 +730,507 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
                 disposalFuture.setValue(true);
             }
         }
+
+        /**
+         * Loops over the new sessions blocking queue and returns the number of
+         * sessions which are effectively created
+         * 
+         * @return The number of new sessions
+         */
+        private int handleNewSessions() {
+            int addedSessions = 0;
+
+            for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
+                if (addNow(session)) {
+                    // A new session has been created
+                    addedSessions++;
+                }
+            }
+
+            return addedSessions;
+        }
+
+        private void notifyIdleSessions(long currentTime) throws Exception {
+            // process idle sessions
+            if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
+                lastIdleCheckTime = currentTime;
+                AbstractIoSession.notifyIdleness(allSessions(), currentTime);
+            }
+        }
+
+        /**
+         * Update the trafficControl for all the session.
+         */
+        private void updateTrafficMask() {
+            int queueSize = trafficControllingSessions.size();
+
+            while (queueSize > 0) {
+                S session = trafficControllingSessions.poll();
+
+                if (session == null) {
+                    // We are done with this queue.
+                    return;
+                }
+
+                SessionState state = getState(session);
+
+                switch (state) {
+                case OPENED:
+                    updateTrafficControl(session);
+
+                    break;
+
+                case CLOSING:
+                    break;
+
+                case OPENING:
+                    // Retry later if session is not yet fully initialized.
+                    // (In case that Session.suspend??() or session.resume??() is
+                    // called before addSession() is processed)
+                    // We just put back the session at the end of the queue.
+                    trafficControllingSessions.add(session);
+                    break;
+
+                default:
+                    throw new IllegalStateException(String.valueOf(state));
+                }
+
+                // As we have handled one session, decrement the number of
+                // remaining sessions. The OPENING session will be processed
+                // with the next select(), as the queue size has been decreased,
+                // even
+                // if the session has been pushed at the end of the queue
+                queueSize--;
+            }
+        }
+
+        /**
+         * Process a new session : - initialize it - create its chain - fire the
+         * CREATED listeners if any
+         * 
+         * @param session
+         *            The session to create
+         * @return <tt>true</tt> if the session has been registered
+         */
+        private boolean addNow(S session) {
+            boolean registered = false;
+
+            try {
+                init(session);
+                registered = true;
+
+                // Build the filter chain of this session.
+                IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
+                chainBuilder.buildFilterChain(session.getFilterChain());
+
+                // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
+                // in AbstractIoFilterChain.fireSessionOpened().
+                // Propagate the SESSION_CREATED event up to the chain
+                IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
+                listeners.fireSessionCreated(session);
+            } catch (Exception e) {
+                ExceptionMonitor.getInstance().exceptionCaught(e);
+
+                try {
+                    destroy(session);
+                } catch (Exception e1) {
+                    ExceptionMonitor.getInstance().exceptionCaught(e1);
+                } finally {
+                    registered = false;
+                }
+            }
+
+            return registered;
+        }
+
+        private int removeSessions() {
+            int removedSessions = 0;
+
+            for (S session = removingSessions.poll(); session != null; session = removingSessions.poll()) {
+                SessionState state = getState(session);
+
+                // Now deal with the removal accordingly to the session's state
+                switch (state) {
+                case OPENED:
+                    // Try to remove this session
+                    if (removeNow(session)) {
+                        removedSessions++;
+                    }
+
+                    break;
+
+                case CLOSING:
+                    // Skip if channel is already closed
+                    // In any case, remove the session from the queue
+                    removedSessions++;
+                    break;
+
+                case OPENING:
+                    // Remove session from the newSessions queue and
+                    // remove it
+                    newSessions.remove(session);
+
+                    if (removeNow(session)) {
+                        removedSessions++;
+                    }
+
+                    break;
+
+                default:
+                    throw new IllegalStateException(String.valueOf(state));
+                }
+            }
+
+            return removedSessions;
+        }
+
+        /**
+         * Write all the pending messages
+         */
+        private void flush(long currentTime) {
+            if (flushingSessions.isEmpty()) {
+                return;
+            }
+
+            do {
+                S session = flushingSessions.poll(); // the same one with
+                                                     // firstSession
+
+                if (session == null) {
+                    // Just in case ... It should not happen.
+                    break;
+                }
+
+                // Reset the Schedule for flush flag for this session,
+                // as we are flushing it now
+                session.unscheduledForFlush();
+
+                SessionState state = getState(session);
+
+                switch (state) {
+                case OPENED:
+                    try {
+                        boolean flushedAll = flushNow(session, currentTime);
+
+                        if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
+                                && !session.isScheduledForFlush()) {
+                            scheduleFlush(session);
+                        }
+                    } catch (Exception e) {
+                        scheduleRemove(session);
+                        session.closeNow();
+                        IoFilterChain filterChain = session.getFilterChain();
+                        filterChain.fireExceptionCaught(e);
+                    }
+
+                    break;
+
+                case CLOSING:
+                    // Skip if the channel is already closed.
+                    break;
+
+                case OPENING:
+                    // Retry later if session is not yet fully initialized.
+                    // (In case that Session.write() is called before addSession()
+                    // is processed)
+                    scheduleFlush(session);
+                    return;
+
+                default:
+                    throw new IllegalStateException(String.valueOf(state));
+                }
+
+            } while (!flushingSessions.isEmpty());
+        }
+
+        private boolean flushNow(S session, long currentTime) {
+            if (!session.isConnected()) {
+                scheduleRemove(session);
+                return false;
+            }
+
+            final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
+
+            final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
+
+            // Set limitation for the number of written bytes for read-write
+            // fairness. I used maxReadBufferSize * 3 / 2, which yields best
+            // performance in my experience while not breaking fairness much.
+            final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
+                    + (session.getConfig().getMaxReadBufferSize() >>> 1);
+            int writtenBytes = 0;
+            WriteRequest req = null;
+
+            try {
+                // Clear OP_WRITE
+                setInterestedInWrite(session, false);
+
+                do {
+                    // Check for pending writes.
+                    req = session.getCurrentWriteRequest();
+
+                    if (req == null) {
+                        req = writeRequestQueue.poll(session);
+
+                        if (req == null) {
+                            break;
+                        }
+
+                        session.setCurrentWriteRequest(req);
+                    }
+
+                    int localWrittenBytes;
+                    Object message = req.getMessage();
+
+                    if (message instanceof IoBuffer) {
+                        localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
+                                currentTime);
+
+                        if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
+                            // the buffer isn't empty, we re-interest it in writing
+                            setInterestedInWrite(session, true);
+
+                            return false;
+                        }
+                    } else if (message instanceof FileRegion) {
+                        localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
+                                currentTime);
+
+                        // Fix for Java bug on Linux
+                        // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
+                        // If there's still data to be written in the FileRegion,
+                        // return 0 indicating that we need
+                        // to pause until writing may resume.
+                        if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
+                            setInterestedInWrite(session, true);
+
+                            return false;
+                        }
+                    } else {
+                        throw new IllegalStateException("Don't know how to handle message of type '"
+                                + message.getClass().getName() + "'.  Are you missing a protocol encoder?");
+                    }
+
+                    if (localWrittenBytes == 0) {
+
+                        // Kernel buffer is full.
+                        if (!req.equals(AbstractIoSession.MESSAGE_SENT_REQUEST)) {
+                            setInterestedInWrite(session, true);
+                            return false;
+                        }
+                    } else {
+                        writtenBytes += localWrittenBytes;
+
+                        if (writtenBytes >= maxWrittenBytes) {
+                            // Wrote too much
+                            scheduleFlush(session);
+                            return false;
+                        }
+                    }
+
+                    if (message instanceof IoBuffer) {
+                        ((IoBuffer) message).free();
+                    }
+                } while (writtenBytes < maxWrittenBytes);
+            } catch (Exception e) {
+                if (req != null) {
+                    req.getFuture().setException(e);
+                }
+
+                IoFilterChain filterChain = session.getFilterChain();
+                filterChain.fireExceptionCaught(e);
+                return false;
+            }
+
+            return true;
+        }
+
+        private void scheduleFlush(S session) {
+            // add the session to the queue if it's not already
+            // in the queue
+            if (session.setScheduledForFlush(true)) {
+                flushingSessions.add(session);
+            }
+        }
+
+        private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
+                throws Exception {
+            int localWrittenBytes;
+            FileRegion region = (FileRegion) req.getMessage();
+
+            if (region.getRemainingBytes() > 0) {
+                int length;
+
+                if (hasFragmentation) {
+                    length = (int) Math.min(region.getRemainingBytes(), maxLength);
+                } else {
+                    length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
+                }
+
+                localWrittenBytes = transferFile(session, region, length);
+                region.update(localWrittenBytes);
+            } else {
+                localWrittenBytes = 0;
+            }
+
+            session.increaseWrittenBytes(localWrittenBytes, currentTime);
+
+            if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) {
+                fireMessageSent(session, req);
+            }
+
+            return localWrittenBytes;
+        }
+
+        private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
+                throws Exception {
+            IoBuffer buf = (IoBuffer) req.getMessage();
+            int localWrittenBytes = 0;
+
+            if (buf.hasRemaining()) {
+                int length;
+
+                if (hasFragmentation) {
+                    length = Math.min(buf.remaining(), maxLength);
+                } else {
+                    length = buf.remaining();
+                }
+
+                try {
+                    localWrittenBytes = write(session, buf, length);
+                } catch (IOException ioe) {
+                    // We have had an issue while trying to send data to the
+                    // peer : let's close the session.
+                    buf.free();
+                    session.closeNow();
+                    removeNow(session);
+
+                    return 0;
+                }
+            }
+
+            session.increaseWrittenBytes(localWrittenBytes, currentTime);
+
+            // Now, forward the original message
+            if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
+                // Buffer has been sent, clear the current request.
+                Object originalMessage = req.getOriginalRequest().getMessage();
+
+                if (originalMessage instanceof IoBuffer) {
+                    buf = (IoBuffer) req.getOriginalRequest().getMessage();
+
+                    int pos = buf.position();
+                    buf.reset();
+                    fireMessageSent(session, req);
+                    // And set it back to its position
+                    buf.position(pos);
+                } else {
+                    fireMessageSent(session, req);
+                }
+            }
+
+            return localWrittenBytes;
+        }
+
+        private boolean removeNow(S session) {
+            clearWriteRequestQueue(session);
+
+            try {
+                destroy(session);
+                return true;
+            } catch (Exception e) {
+                IoFilterChain filterChain = session.getFilterChain();
+                filterChain.fireExceptionCaught(e);
+            } finally {
+                try {
+                    clearWriteRequestQueue(session);
+                    ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
+                } catch (Exception e) {
+                    // The session was either destroyed or not at this point.
+                    // We do not want any exception thrown from this "cleanup" code
+                    // to change
+                    // the return value by bubbling up.
+                    IoFilterChain filterChain = session.getFilterChain();
+                    filterChain.fireExceptionCaught(e);
+                }
+            }
+
+            return false;
+        }
+
+        private void clearWriteRequestQueue(S session) {
+            WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
+            WriteRequest req;
+
+            List<WriteRequest> failedRequests = new ArrayList<>();
+
+            if ((req = writeRequestQueue.poll(session)) != null) {
+                Object message = req.getMessage();
+
+                if (message instanceof IoBuffer) {
+                    IoBuffer buf = (IoBuffer) message;
+
+                    // The first unwritten empty buffer must be
+                    // forwarded to the filter chain.
+                    if (buf.hasRemaining()) {
+                        buf.reset();
+                        failedRequests.add(req);
+                    } else {
+                        IoFilterChain filterChain = session.getFilterChain();
+                        filterChain.fireMessageSent(req);
+                    }
+                } else {
+                    failedRequests.add(req);
+                }
+
+                // Discard others.
+                while ((req = writeRequestQueue.poll(session)) != null) {
+                    failedRequests.add(req);
+                }
+            }
+
+            // Create an exception and notify.
+            if (!failedRequests.isEmpty()) {
+                WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
+
+                for (WriteRequest r : failedRequests) {
+                    session.decreaseScheduledBytesAndMessages(r);
+                    r.getFuture().setException(cause);
+                }
+
+                IoFilterChain filterChain = session.getFilterChain();
+                filterChain.fireExceptionCaught(cause);
+            }
+        }
+
+        private void fireMessageSent(S session, WriteRequest req) {
+            session.setCurrentWriteRequest(null);
+            IoFilterChain filterChain = session.getFilterChain();
+            filterChain.fireMessageSent(req);
+        }
+        
+        private void process() throws Exception {
+            for (Iterator<S> i = selectedSessions(); i.hasNext();) {
+                S session = i.next();
+                process(session);
+                i.remove();
+            }
+        }
+
+        /**
+         * Deal with session ready for the read or write operations, or both.
+         */
+        private void process(S session) {
+            // Process Reads
+            if (isReadable(session) && !session.isReadSuspended()) {
+                read(session);
+            }
+
+            // Process writes
+            if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) {
+                // add the session to the queue, if it's not already there
+                flushingSessions.add(session);
+            }
+        }
     }
 }


[3/4] mina git commit: Fixed some Soarlint warnings

Posted by el...@apache.org.
Fixed some Soarlint warnings

Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/c87701fb
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/c87701fb
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/c87701fb

Branch: refs/heads/2.0
Commit: c87701fb188cef67fa500a17780a107f6c0a456e
Parents: 37239fd
Author: Emmanuel L�charny <el...@symas.com>
Authored: Wed Dec 7 11:43:43 2016 +0100
Committer: Emmanuel L�charny <el...@symas.com>
Committed: Wed Dec 7 11:43:43 2016 +0100

----------------------------------------------------------------------
 .../socket/AbstractDatagramSessionConfig.java   |  2 ++
 .../mina/transport/socket/DatagramAcceptor.java |  3 +++
 .../transport/socket/DatagramConnector.java     |  2 ++
 .../socket/DefaultDatagramSessionConfig.java    | 26 +++++++++++++++++++-
 .../mina/transport/socket/SocketAcceptor.java   |  3 +++
 .../mina/transport/socket/SocketConnector.java  |  2 ++
 .../mina/transport/socket/nio/NioSession.java   |  6 +++++
 .../socket/nio/NioSocketConnector.java          |  6 +++++
 .../transport/socket/nio/NioSocketSession.java  |  1 +
 9 files changed, 50 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina/blob/c87701fb/mina-core/src/main/java/org/apache/mina/transport/socket/AbstractDatagramSessionConfig.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/AbstractDatagramSessionConfig.java b/mina-core/src/main/java/org/apache/mina/transport/socket/AbstractDatagramSessionConfig.java
index 67daf74..0ef1b9f 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/AbstractDatagramSessionConfig.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/AbstractDatagramSessionConfig.java
@@ -136,6 +136,7 @@ public abstract class AbstractDatagramSessionConfig extends AbstractIoSessionCon
     /**
      * {@inheritDoc}
      */
+    @Override
     public boolean isCloseOnPortUnreachable() {
         return closeOnPortUnreachable;
     }
@@ -143,6 +144,7 @@ public abstract class AbstractDatagramSessionConfig extends AbstractIoSessionCon
     /**
      * {@inheritDoc}
      */
+    @Override
     public void setCloseOnPortUnreachable(boolean closeOnPortUnreachable) {
         this.closeOnPortUnreachable = closeOnPortUnreachable;
     }

http://git-wip-us.apache.org/repos/asf/mina/blob/c87701fb/mina-core/src/main/java/org/apache/mina/transport/socket/DatagramAcceptor.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/DatagramAcceptor.java b/mina-core/src/main/java/org/apache/mina/transport/socket/DatagramAcceptor.java
index fba0319..bc0c687 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/DatagramAcceptor.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/DatagramAcceptor.java
@@ -38,12 +38,14 @@ public interface DatagramAcceptor extends IoAcceptor {
      * necessarily the firstly bound address.
      * This method overrides the {@link IoAcceptor#getLocalAddress()} method.
      */
+    @Override
     InetSocketAddress getLocalAddress();
 
     /**
      * @return a {@link Set} of the local InetSocketAddress which are bound currently.
      * This method overrides the {@link IoAcceptor#getDefaultLocalAddress()} method.
      */
+    @Override
     InetSocketAddress getDefaultLocalAddress();
 
     /**
@@ -72,5 +74,6 @@ public interface DatagramAcceptor extends IoAcceptor {
      * @return the default Datagram configuration of the new {@link IoSession}s
      * created by this service.
      */
+    @Override
     DatagramSessionConfig getSessionConfig();
 }

http://git-wip-us.apache.org/repos/asf/mina/blob/c87701fb/mina-core/src/main/java/org/apache/mina/transport/socket/DatagramConnector.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/DatagramConnector.java b/mina-core/src/main/java/org/apache/mina/transport/socket/DatagramConnector.java
index 02e5249..15ee056 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/DatagramConnector.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/DatagramConnector.java
@@ -34,12 +34,14 @@ public interface DatagramConnector extends IoConnector {
      * is specified in {@link #connect()} method.
      * This method overrides the {@link IoConnector#getDefaultRemoteAddress()} method.
      */
+    @Override
     InetSocketAddress getDefaultRemoteAddress();
 
     /**
      * @return the default configuration of the new FatagramSessions created by 
      * this connect service.
      */
+    @Override
     DatagramSessionConfig getSessionConfig();
     
     /**

http://git-wip-us.apache.org/repos/asf/mina/blob/c87701fb/mina-core/src/main/java/org/apache/mina/transport/socket/DefaultDatagramSessionConfig.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/DefaultDatagramSessionConfig.java b/mina-core/src/main/java/org/apache/mina/transport/socket/DefaultDatagramSessionConfig.java
index 843e893..198d479 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/DefaultDatagramSessionConfig.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/DefaultDatagramSessionConfig.java
@@ -59,6 +59,7 @@ public class DefaultDatagramSessionConfig extends AbstractDatagramSessionConfig
     /**
      * @see DatagramSocket#getBroadcast()
      */
+    @Override
     public boolean isBroadcast() {
         return broadcast;
     }
@@ -66,6 +67,7 @@ public class DefaultDatagramSessionConfig extends AbstractDatagramSessionConfig
     /**
      * @see DatagramSocket#setBroadcast(boolean)
      */
+    @Override
     public void setBroadcast(boolean broadcast) {
         this.broadcast = broadcast;
     }
@@ -73,6 +75,7 @@ public class DefaultDatagramSessionConfig extends AbstractDatagramSessionConfig
     /**
      * @see DatagramSocket#getReuseAddress()
      */
+    @Override
     public boolean isReuseAddress() {
         return reuseAddress;
     }
@@ -80,6 +83,7 @@ public class DefaultDatagramSessionConfig extends AbstractDatagramSessionConfig
     /**
      * @see DatagramSocket#setReuseAddress(boolean)
      */
+    @Override
     public void setReuseAddress(boolean reuseAddress) {
         this.reuseAddress = reuseAddress;
     }
@@ -87,6 +91,7 @@ public class DefaultDatagramSessionConfig extends AbstractDatagramSessionConfig
     /**
      * @see DatagramSocket#getReceiveBufferSize()
      */
+    @Override
     public int getReceiveBufferSize() {
         return receiveBufferSize;
     }
@@ -94,6 +99,7 @@ public class DefaultDatagramSessionConfig extends AbstractDatagramSessionConfig
     /**
      * @see DatagramSocket#setReceiveBufferSize(int)
      */
+    @Override
     public void setReceiveBufferSize(int receiveBufferSize) {
         this.receiveBufferSize = receiveBufferSize;
     }
@@ -101,6 +107,7 @@ public class DefaultDatagramSessionConfig extends AbstractDatagramSessionConfig
     /**
      * @see DatagramSocket#getSendBufferSize()
      */
+    @Override
     public int getSendBufferSize() {
         return sendBufferSize;
     }
@@ -108,6 +115,7 @@ public class DefaultDatagramSessionConfig extends AbstractDatagramSessionConfig
     /**
      * @see DatagramSocket#setSendBufferSize(int)
      */
+    @Override
     public void setSendBufferSize(int sendBufferSize) {
         this.sendBufferSize = sendBufferSize;
     }
@@ -115,6 +123,7 @@ public class DefaultDatagramSessionConfig extends AbstractDatagramSessionConfig
     /**
      * @see DatagramSocket#getTrafficClass()
      */
+    @Override
     public int getTrafficClass() {
         return trafficClass;
     }
@@ -122,33 +131,48 @@ public class DefaultDatagramSessionConfig extends AbstractDatagramSessionConfig
     /**
      * @see DatagramSocket#setTrafficClass(int)
      */
+    @Override
     public void setTrafficClass(int trafficClass) {
         this.trafficClass = trafficClass;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     protected boolean isBroadcastChanged() {
         return broadcast != DEFAULT_BROADCAST;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     protected boolean isReceiveBufferSizeChanged() {
         return receiveBufferSize != DEFAULT_RECEIVE_BUFFER_SIZE;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     protected boolean isReuseAddressChanged() {
         return reuseAddress != DEFAULT_REUSE_ADDRESS;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     protected boolean isSendBufferSizeChanged() {
         return sendBufferSize != DEFAULT_SEND_BUFFER_SIZE;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     protected boolean isTrafficClassChanged() {
         return trafficClass != DEFAULT_TRAFFIC_CLASS;
     }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina/blob/c87701fb/mina-core/src/main/java/org/apache/mina/transport/socket/SocketAcceptor.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/SocketAcceptor.java b/mina-core/src/main/java/org/apache/mina/transport/socket/SocketAcceptor.java
index 86d1f47..b21f79c 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/SocketAcceptor.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/SocketAcceptor.java
@@ -38,12 +38,14 @@ public interface SocketAcceptor extends IoAcceptor {
      * necessarily the firstly bound address.
      * This method overrides the {@link IoAcceptor#getLocalAddress()} method.
      */
+    @Override
     InetSocketAddress getLocalAddress();
 
     /**
      * @return a {@link Set} of the local InetSocketAddress which are bound currently.
      * This method overrides the {@link IoAcceptor#getDefaultLocalAddress()} method.
      */
+    @Override
     InetSocketAddress getDefaultLocalAddress();
 
     /**
@@ -87,5 +89,6 @@ public interface SocketAcceptor extends IoAcceptor {
      * @return the default configuration of the new SocketSessions created by 
      * this acceptor service.
      */
+    @Override
     SocketSessionConfig getSessionConfig();
 }

http://git-wip-us.apache.org/repos/asf/mina/blob/c87701fb/mina-core/src/main/java/org/apache/mina/transport/socket/SocketConnector.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/SocketConnector.java b/mina-core/src/main/java/org/apache/mina/transport/socket/SocketConnector.java
index f91b188..0254c55 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/SocketConnector.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/SocketConnector.java
@@ -34,12 +34,14 @@ public interface SocketConnector extends IoConnector {
      * is specified in {@link #connect()} method.
      * This method overrides the {@link IoConnector#getDefaultRemoteAddress()} method.
      */
+    @Override
     InetSocketAddress getDefaultRemoteAddress();
 
     /**
      * @return the default configuration of the new SocketSessions created by 
      * this connect service.
      */
+    @Override
     SocketSessionConfig getSessionConfig();
     
     /**

http://git-wip-us.apache.org/repos/asf/mina/blob/c87701fb/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSession.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSession.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSession.java
index bc80d9c..97245cf 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSession.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSession.java
@@ -70,6 +70,10 @@ public abstract class NioSession extends AbstractIoSession {
      */
     abstract ByteChannel getChannel();
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public IoFilterChain getFilterChain() {
         return filterChain;
     }
@@ -93,6 +97,7 @@ public abstract class NioSession extends AbstractIoSession {
     /**
      * {@inheritDoc}
      */
+    @Override
     public IoProcessor<NioSession> getProcessor() {
         return processor;
     }
@@ -100,6 +105,7 @@ public abstract class NioSession extends AbstractIoSession {
     /**
      * {@inheritDoc}
      */
+    @Override
     public final boolean isActive() {
         return key.isValid();
     }

http://git-wip-us.apache.org/repos/asf/mina/blob/c87701fb/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java
index bd1cf00..63313d7 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java
@@ -142,6 +142,7 @@ SocketConnector {
     /**
      * {@inheritDoc}
      */
+    @Override
     public TransportMetadata getTransportMetadata() {
         return NioSocketSession.METADATA;
     }
@@ -149,6 +150,7 @@ SocketConnector {
     /**
      * {@inheritDoc}
      */
+    @Override
     public SocketSessionConfig getSessionConfig() {
         return (SocketSessionConfig) sessionConfig;
     }
@@ -164,6 +166,7 @@ SocketConnector {
     /**
      * {@inheritDoc}
      */
+    @Override
     public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
         super.setDefaultRemoteAddress(defaultRemoteAddress);
     }
@@ -316,6 +319,7 @@ SocketConnector {
         /**
          * {@inheritDoc}
          */
+        @Override
         public boolean hasNext() {
             return i.hasNext();
         }
@@ -323,6 +327,7 @@ SocketConnector {
         /**
          * {@inheritDoc}
          */
+        @Override
         public SocketChannel next() {
             SelectionKey key = i.next();
             return (SocketChannel) key.channel();
@@ -331,6 +336,7 @@ SocketConnector {
         /**
          * {@inheritDoc}
          */
+        @Override
         public void remove() {
             i.remove();
         }

http://git-wip-us.apache.org/repos/asf/mina/blob/c87701fb/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketSession.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketSession.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketSession.java
index 8948c55..8bae465 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketSession.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketSession.java
@@ -352,6 +352,7 @@ class NioSocketSession extends NioSession {
     /**
      * {@inheritDoc}
      */
+    @Override
     public final boolean isSecured() {
         // If the session does not have a SslFilter, we can return false
         IoFilterChain chain = getFilterChain();


[4/4] mina git commit: o Added some missing Javadoc o Fixed some SonarLint issues

Posted by el...@apache.org.
o Added some missing Javadoc
o Fixed some SonarLint issues

Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/7c080890
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/7c080890
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/7c080890

Branch: refs/heads/2.0
Commit: 7c080890b86005d743903552a53a51db4ad3ee13
Parents: c87701f
Author: Emmanuel L�charny <el...@symas.com>
Authored: Wed Dec 7 19:28:49 2016 +0100
Committer: Emmanuel L�charny <el...@symas.com>
Committed: Wed Dec 7 19:28:49 2016 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/mina/core/IoUtil.java  | 67 +++++++++++++---
 .../apache/mina/core/RuntimeIoException.java    | 19 +++++
 .../mina/core/write/DefaultWriteRequest.java    | 84 ++++++++++++++++++++
 .../core/write/NothingWrittenException.java     | 56 ++++++++++++-
 .../apache/mina/core/write/WriteException.java  |  4 +-
 .../mina/core/write/WriteRequestWrapper.java    |  5 ++
 .../mina/core/write/WriteTimeoutException.java  | 56 ++++++++++++-
 .../write/WriteToClosedSessionException.java    | 57 ++++++++++++-
 8 files changed, 325 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina/blob/7c080890/mina-core/src/main/java/org/apache/mina/core/IoUtil.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/IoUtil.java b/mina-core/src/main/java/org/apache/mina/core/IoUtil.java
index 98def63..b9711bd 100644
--- a/mina-core/src/main/java/org/apache/mina/core/IoUtil.java
+++ b/mina-core/src/main/java/org/apache/mina/core/IoUtil.java
@@ -39,6 +39,10 @@ import org.apache.mina.core.session.IoSession;
 public final class IoUtil {
     private static final IoSession[] EMPTY_SESSIONS = new IoSession[0];
 
+    private IoUtil() {
+        // Do nothing
+    }
+
     /**
      * Writes the specified {@code message} to the specified {@code sessions}.
      * If the specified {@code message} is an {@link IoBuffer}, the buffer is
@@ -49,7 +53,7 @@ public final class IoUtil {
      * @return The list of WriteFuture created for each broadcasted message
      */
     public static List<WriteFuture> broadcast(Object message, Collection<IoSession> sessions) {
-        List<WriteFuture> answer = new ArrayList<WriteFuture>(sessions.size());
+        List<WriteFuture> answer = new ArrayList<>(sessions.size());
         broadcast(message, sessions.iterator(), answer);
         return answer;
     }
@@ -64,7 +68,7 @@ public final class IoUtil {
      * @return The list of WriteFuture created for each broadcasted message
      */
     public static List<WriteFuture> broadcast(Object message, Iterable<IoSession> sessions) {
-        List<WriteFuture> answer = new ArrayList<WriteFuture>();
+        List<WriteFuture> answer = new ArrayList<>();
         broadcast(message, sessions.iterator(), answer);
         return answer;
     }
@@ -79,7 +83,7 @@ public final class IoUtil {
      * @return The list of {@link WriteFuture} for the written messages
      */
     public static List<WriteFuture> broadcast(Object message, Iterator<IoSession> sessions) {
-        List<WriteFuture> answer = new ArrayList<WriteFuture>();
+        List<WriteFuture> answer = new ArrayList<>();
         broadcast(message, sessions, answer);
         return answer;
     }
@@ -98,7 +102,7 @@ public final class IoUtil {
             sessions = EMPTY_SESSIONS;
         }
 
-        List<WriteFuture> answer = new ArrayList<WriteFuture>(sessions.length);
+        List<WriteFuture> answer = new ArrayList<>(sessions.length);
         if (message instanceof IoBuffer) {
             for (IoSession s : sessions) {
                 answer.add(s.write(((IoBuffer) message).duplicate()));
@@ -125,31 +129,78 @@ public final class IoUtil {
         }
     }
 
+    /**
+     * Wait on all the {@link IoFuture}s we get, or until one of the {@link IoFuture}s is interrupted
+     *  
+     * @param futures The {@link IoFuture}s we are waiting on
+     * @throws InterruptedException If one of the {@link IoFuture} is interrupted
+     */
     public static void await(Iterable<? extends IoFuture> futures) throws InterruptedException {
         for (IoFuture f : futures) {
             f.await();
         }
     }
 
+    /**
+     * Wait on all the {@link IoFuture}s we get. This can't get interrupted.
+     *  
+     * @param futures The {@link IoFuture}s we are waiting on
+     */
     public static void awaitUninterruptably(Iterable<? extends IoFuture> futures) {
         for (IoFuture f : futures) {
             f.awaitUninterruptibly();
         }
     }
 
+    /**
+     * Wait on all the {@link IoFuture}s we get, or until one of the {@link IoFuture}s is interrupted
+     *  
+     * @param futures The {@link IoFuture}s we are waiting on 
+     * @param timeout The maximum time we wait for the {@link IoFuture}s to complete
+     * @param unit The Time unit to use for the timeout
+     * @return <tt>TRUE</TT> if all the {@link IoFuture} have been completed, <tt>FALSE</tt> if
+     * at least one {@link IoFuture} haas been interrupted
+     * @throws InterruptedException If one of the {@link IoFuture} is interrupted
+     */
     public static boolean await(Iterable<? extends IoFuture> futures, long timeout, TimeUnit unit)
             throws InterruptedException {
         return await(futures, unit.toMillis(timeout));
     }
 
+    /**
+     * Wait on all the {@link IoFuture}s we get, or until one of the {@link IoFuture}s is interrupted
+     *  
+     * @param futures The {@link IoFuture}s we are waiting on 
+     * @param timeoutMillis The maximum milliseconds we wait for the {@link IoFuture}s to complete
+     * @return <tt>TRUE</TT> if all the {@link IoFuture} have been completed, <tt>FALSE</tt> if
+     * at least one {@link IoFuture} has been interrupted
+     * @throws InterruptedException If one of the {@link IoFuture} is interrupted
+     */
     public static boolean await(Iterable<? extends IoFuture> futures, long timeoutMillis) throws InterruptedException {
         return await0(futures, timeoutMillis, true);
     }
 
+    /**
+     * Wait on all the {@link IoFuture}s we get.
+     *  
+     * @param futures The {@link IoFuture}s we are waiting on 
+     * @param timeout The maximum time we wait for the {@link IoFuture}s to complete
+     * @param unit The Time unit to use for the timeout
+     * @return <tt>TRUE</TT> if all the {@link IoFuture} have been completed, <tt>FALSE</tt> if
+     * at least one {@link IoFuture} has been interrupted
+     */
     public static boolean awaitUninterruptibly(Iterable<? extends IoFuture> futures, long timeout, TimeUnit unit) {
         return awaitUninterruptibly(futures, unit.toMillis(timeout));
     }
 
+    /**
+     * Wait on all the {@link IoFuture}s we get.
+     *  
+     * @param futures The {@link IoFuture}s we are waiting on 
+     * @param timeoutMillis The maximum milliseconds we wait for the {@link IoFuture}s to complete
+     * @return <tt>TRUE</TT> if all the {@link IoFuture} have been completed, <tt>FALSE</tt> if
+     * at least one {@link IoFuture} has been interrupted
+     */
     public static boolean awaitUninterruptibly(Iterable<? extends IoFuture> futures, long timeoutMillis) {
         try {
             return await0(futures, timeoutMillis, false);
@@ -165,8 +216,10 @@ public final class IoUtil {
 
         boolean lastComplete = true;
         Iterator<? extends IoFuture> i = futures.iterator();
+        
         while (i.hasNext()) {
             IoFuture f = i.next();
+
             do {
                 if (interruptable) {
                     lastComplete = f.await(waitTime);
@@ -176,7 +229,7 @@ public final class IoUtil {
 
                 waitTime = timeoutMillis - (System.currentTimeMillis() - startTime);
 
-                if (lastComplete || waitTime <= 0) {
+                if (waitTime <= 0) {
                     break;
                 }
             } while (!lastComplete);
@@ -188,8 +241,4 @@ public final class IoUtil {
 
         return lastComplete && !i.hasNext();
     }
-
-    private IoUtil() {
-        // Do nothing
-    }
 }

http://git-wip-us.apache.org/repos/asf/mina/blob/7c080890/mina-core/src/main/java/org/apache/mina/core/RuntimeIoException.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/RuntimeIoException.java b/mina-core/src/main/java/org/apache/mina/core/RuntimeIoException.java
index b014b24..88a4b3d 100644
--- a/mina-core/src/main/java/org/apache/mina/core/RuntimeIoException.java
+++ b/mina-core/src/main/java/org/apache/mina/core/RuntimeIoException.java
@@ -33,18 +33,37 @@ import java.io.IOException;
 public class RuntimeIoException extends RuntimeException {
     private static final long serialVersionUID = 9029092241311939548L;
 
+    /**
+     * Create a new RuntimeIoException instance
+     */
     public RuntimeIoException() {
         super();
     }
 
+    /**
+     * Create a new RuntimeIoException instance
+     * 
+     * @param message The error message
+     */
     public RuntimeIoException(String message) {
         super(message);
     }
 
+    /**
+     * Create a new RuntimeIoException instance
+     * 
+     * @param message The error message
+     * @param cause The original exception
+     */
     public RuntimeIoException(String message, Throwable cause) {
         super(message, cause);
     }
 
+    /**
+     * Create a new RuntimeIoException instance
+     * 
+     * @param cause The original exception
+     */
     public RuntimeIoException(Throwable cause) {
         super(cause);
     }

http://git-wip-us.apache.org/repos/asf/mina/blob/7c080890/mina-core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java b/mina-core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java
index f03bbd7..1d1e5fb 100644
--- a/mina-core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java
+++ b/mina-core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java
@@ -37,66 +37,130 @@ public class DefaultWriteRequest implements WriteRequest {
 
     /** An empty FUTURE */
     private static final WriteFuture UNUSED_FUTURE = new WriteFuture() {
+        /**
+         * {@inheritDoc}
+         */
+        @Override
         public boolean isWritten() {
             return false;
         }
 
+        /**
+         * {@inheritDoc}
+         */
+        @Override
         public void setWritten() {
             // Do nothing
         }
 
+        /**
+         * {@inheritDoc}
+         */
+        @Override
         public IoSession getSession() {
             return null;
         }
 
+        /**
+         * {@inheritDoc}
+         */
+        @Override
         public void join() {
             // Do nothing
         }
 
+        /**
+         * {@inheritDoc}
+         */
+        @Override
         public boolean join(long timeoutInMillis) {
             return true;
         }
 
+        /**
+         * {@inheritDoc}
+         */
+        @Override
         public boolean isDone() {
             return true;
         }
 
+        /**
+         * {@inheritDoc}
+         */
+        @Override
         public WriteFuture addListener(IoFutureListener<?> listener) {
             throw new IllegalStateException("You can't add a listener to a dummy future.");
         }
 
+        /**
+         * {@inheritDoc}
+         */
+        @Override
         public WriteFuture removeListener(IoFutureListener<?> listener) {
             throw new IllegalStateException("You can't add a listener to a dummy future.");
         }
 
+        /**
+         * {@inheritDoc}
+         */
+        @Override
         public WriteFuture await() throws InterruptedException {
             return this;
         }
 
+        /**
+         * {@inheritDoc}
+         */
+        @Override
         public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
             return true;
         }
 
+        /**
+         * {@inheritDoc}
+         */
+        @Override
         public boolean await(long timeoutMillis) throws InterruptedException {
             return true;
         }
 
+        /**
+         * {@inheritDoc}
+         */
+        @Override
         public WriteFuture awaitUninterruptibly() {
             return this;
         }
 
+        /**
+         * {@inheritDoc}
+         */
+        @Override
         public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
             return true;
         }
 
+        /**
+         * {@inheritDoc}
+         */
+        @Override
         public boolean awaitUninterruptibly(long timeoutMillis) {
             return true;
         }
 
+        /**
+         * {@inheritDoc}
+         */
+        @Override
         public Throwable getException() {
             return null;
         }
 
+        /**
+         * {@inheritDoc}
+         */
+        @Override
         public void setException(Throwable cause) {
             // Do nothing
         }
@@ -151,18 +215,34 @@ public class DefaultWriteRequest implements WriteRequest {
         this.destination = destination;
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public WriteFuture getFuture() {
         return future;
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public Object getMessage() {
         return message;
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public WriteRequest getOriginalRequest() {
         return this;
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public SocketAddress getDestination() {
         return destination;
     }
@@ -190,6 +270,10 @@ public class DefaultWriteRequest implements WriteRequest {
         return sb.toString();
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public boolean isEncoded() {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/mina/blob/7c080890/mina-core/src/main/java/org/apache/mina/core/write/NothingWrittenException.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/write/NothingWrittenException.java b/mina-core/src/main/java/org/apache/mina/core/write/NothingWrittenException.java
index 66e228e..6ccf9c7 100644
--- a/mina-core/src/main/java/org/apache/mina/core/write/NothingWrittenException.java
+++ b/mina-core/src/main/java/org/apache/mina/core/write/NothingWrittenException.java
@@ -31,34 +31,82 @@ public class NothingWrittenException extends WriteException {
 
     private static final long serialVersionUID = -6331979307737691005L;
 
+    /**
+     * Create a new NothingWrittenException instance
+     * 
+     * @param requests The {@link WriteRequest}s that haven't been written
+     * @param message The error message
+     * @param cause The original exception
+     */
     public NothingWrittenException(Collection<WriteRequest> requests, String message, Throwable cause) {
         super(requests, message, cause);
     }
 
-    public NothingWrittenException(Collection<WriteRequest> requests, String s) {
-        super(requests, s);
+    /**
+     * Create a new NothingWrittenException instance
+     * 
+     * @param requests The {@link WriteRequest}s that haven't been written
+     * @param message The error message
+     */
+    public NothingWrittenException(Collection<WriteRequest> requests, String message) {
+        super(requests, message);
     }
 
+    /**
+     * Create a new NothingWrittenException instance
+     * 
+     * @param requests The {@link WriteRequest} that haven't been written
+     * @param cause The original exception
+     */
     public NothingWrittenException(Collection<WriteRequest> requests, Throwable cause) {
         super(requests, cause);
     }
 
+    /**
+     * Create a new NothingWrittenException instance
+     * 
+     * @param requests The {@link WriteRequest} that haven't been written
+     */
     public NothingWrittenException(Collection<WriteRequest> requests) {
         super(requests);
     }
 
+    /**
+     * Create a new NothingWrittenException instance
+     * 
+     * @param request The {@link WriteRequest} that hasn't been written
+     * @param message The error message
+     * @param cause The original exception
+     */
     public NothingWrittenException(WriteRequest request, String message, Throwable cause) {
         super(request, message, cause);
     }
 
-    public NothingWrittenException(WriteRequest request, String s) {
-        super(request, s);
+    /**
+     * Create a new NothingWrittenException instance
+     * 
+     * @param request The {@link WriteRequest} that hasn't been written
+     * @param message The error message
+     */
+    public NothingWrittenException(WriteRequest request, String message) {
+        super(request, message);
     }
 
+    /**
+     * Create a new NothingWrittenException instance
+     * 
+     * @param request The {@link WriteRequest} that hasn't been written
+     * @param cause The original exception
+     */
     public NothingWrittenException(WriteRequest request, Throwable cause) {
         super(request, cause);
     }
 
+    /**
+     * Create a new NothingWrittenException instance
+     * 
+     * @param request The {@link WriteRequest} that hasn't been written
+     */
     public NothingWrittenException(WriteRequest request) {
         super(request);
     }

http://git-wip-us.apache.org/repos/asf/mina/blob/7c080890/mina-core/src/main/java/org/apache/mina/core/write/WriteException.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/write/WriteException.java b/mina-core/src/main/java/org/apache/mina/core/write/WriteException.java
index 193432b..97acbb4 100644
--- a/mina-core/src/main/java/org/apache/mina/core/write/WriteException.java
+++ b/mina-core/src/main/java/org/apache/mina/core/write/WriteException.java
@@ -155,7 +155,7 @@ public class WriteException extends IOException {
         }
 
         // Create a list of requests removing duplicates.
-        Set<WriteRequest> newRequests = new MapBackedSet<WriteRequest>(new LinkedHashMap<WriteRequest, Boolean>());
+        Set<WriteRequest> newRequests = new MapBackedSet<>(new LinkedHashMap<WriteRequest, Boolean>());
         
         for (WriteRequest r : requests) {
             newRequests.add(r.getOriginalRequest());
@@ -169,7 +169,7 @@ public class WriteException extends IOException {
             throw new IllegalArgumentException("request");
         }
 
-        List<WriteRequest> requests = new ArrayList<WriteRequest>(1);
+        List<WriteRequest> requests = new ArrayList<>(1);
         requests.add(request.getOriginalRequest());
         
         return Collections.unmodifiableList(requests);

http://git-wip-us.apache.org/repos/asf/mina/blob/7c080890/mina-core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java b/mina-core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java
index 0941d43..0abea69 100644
--- a/mina-core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java
+++ b/mina-core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java
@@ -47,6 +47,7 @@ public class WriteRequestWrapper implements WriteRequest {
     /**
      * {@inheritDoc}
      */
+    @Override
     public SocketAddress getDestination() {
         return parentRequest.getDestination();
     }
@@ -54,6 +55,7 @@ public class WriteRequestWrapper implements WriteRequest {
     /**
      * {@inheritDoc}
      */
+    @Override
     public WriteFuture getFuture() {
         return parentRequest.getFuture();
     }
@@ -61,6 +63,7 @@ public class WriteRequestWrapper implements WriteRequest {
     /**
      * {@inheritDoc}
      */
+    @Override
     public Object getMessage() {
         return parentRequest.getMessage();
     }
@@ -68,6 +71,7 @@ public class WriteRequestWrapper implements WriteRequest {
     /**
      * {@inheritDoc}
      */
+    @Override
     public WriteRequest getOriginalRequest() {
         return parentRequest.getOriginalRequest();
     }
@@ -90,6 +94,7 @@ public class WriteRequestWrapper implements WriteRequest {
     /**
      * {@inheritDoc}
      */
+    @Override
     public boolean isEncoded() {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/mina/blob/7c080890/mina-core/src/main/java/org/apache/mina/core/write/WriteTimeoutException.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/write/WriteTimeoutException.java b/mina-core/src/main/java/org/apache/mina/core/write/WriteTimeoutException.java
index 9ee214c..3fd3e60 100644
--- a/mina-core/src/main/java/org/apache/mina/core/write/WriteTimeoutException.java
+++ b/mina-core/src/main/java/org/apache/mina/core/write/WriteTimeoutException.java
@@ -32,34 +32,82 @@ import org.apache.mina.core.session.IoSessionConfig;
 public class WriteTimeoutException extends WriteException {
     private static final long serialVersionUID = 3906931157944579121L;
 
+    /**
+     * Create a new WriteTimeoutException instance
+     * 
+     * @param requests The {@link WriteRequest}s for which we have had a timeout
+     * @param message The error message
+     * @param cause The original exception
+     */
     public WriteTimeoutException(Collection<WriteRequest> requests, String message, Throwable cause) {
         super(requests, message, cause);
     }
 
-    public WriteTimeoutException(Collection<WriteRequest> requests, String s) {
-        super(requests, s);
+    /**
+     * Create a new WriteTimeoutException instance
+     * 
+     * @param requests The {@link WriteRequest}s for which we have had a timeout
+     * @param message The error message
+     */
+    public WriteTimeoutException(Collection<WriteRequest> requests, String message) {
+        super(requests, message);
     }
 
+    /**
+     * Create a new WriteTimeoutException instance
+     * 
+     * @param requests The {@link WriteRequest}s for which we have had a timeout
+     * @param cause The original exception
+     */
     public WriteTimeoutException(Collection<WriteRequest> requests, Throwable cause) {
         super(requests, cause);
     }
 
+    /**
+     * Create a new WriteTimeoutException instance
+     * 
+     * @param requests The {@link WriteRequest}s for which we have had a timeout
+     */
     public WriteTimeoutException(Collection<WriteRequest> requests) {
         super(requests);
     }
 
+    /**
+     * Create a new WriteTimeoutException instance
+     * 
+     * @param request The {@link WriteRequest} for which we have had a timeout
+     * @param message The error message
+     * @param cause The original exception
+     */
     public WriteTimeoutException(WriteRequest request, String message, Throwable cause) {
         super(request, message, cause);
     }
 
-    public WriteTimeoutException(WriteRequest request, String s) {
-        super(request, s);
+    /**
+     * Create a new WriteTimeoutException instance
+     * 
+     * @param request The {@link WriteRequest} for which we have had a timeout
+     * @param message The error message
+     */
+    public WriteTimeoutException(WriteRequest request, String message) {
+        super(request, message);
     }
 
+    /**
+     * Create a new WriteTimeoutException instance
+     * 
+     * @param request The {@link WriteRequest} for which we have had a timeout
+     * @param cause The original exception
+     */
     public WriteTimeoutException(WriteRequest request, Throwable cause) {
         super(request, cause);
     }
 
+    /**
+     * Create a new WriteTimeoutException instance
+     * 
+     * @param request The {@link WriteRequest} for which we have had a timeout
+     */
     public WriteTimeoutException(WriteRequest request) {
         super(request);
     }

http://git-wip-us.apache.org/repos/asf/mina/blob/7c080890/mina-core/src/main/java/org/apache/mina/core/write/WriteToClosedSessionException.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/write/WriteToClosedSessionException.java b/mina-core/src/main/java/org/apache/mina/core/write/WriteToClosedSessionException.java
index 620dbe0..13c240c 100644
--- a/mina-core/src/main/java/org/apache/mina/core/write/WriteToClosedSessionException.java
+++ b/mina-core/src/main/java/org/apache/mina/core/write/WriteToClosedSessionException.java
@@ -31,34 +31,83 @@ public class WriteToClosedSessionException extends WriteException {
 
     private static final long serialVersionUID = 5550204573739301393L;
 
+    /**
+     * Create a new WriteToClosedSessionException instance
+     * 
+     * @param requests The {@link WriteRequest}s which have been written on a closed session
+     * @param message The error message
+     * @param cause The original exception
+     */
     public WriteToClosedSessionException(Collection<WriteRequest> requests, String message, Throwable cause) {
         super(requests, message, cause);
     }
 
-    public WriteToClosedSessionException(Collection<WriteRequest> requests, String s) {
-        super(requests, s);
+    /**
+     * Create a new WriteToClosedSessionException instance
+     * 
+     * @param requests The {@link WriteRequest}s which have been written on a closed session
+     * @param message The error message
+     */
+    public WriteToClosedSessionException(Collection<WriteRequest> requests, String message) {
+        super(requests, message);
     }
 
+    /**
+     * Create a new WriteToClosedSessionException instance
+     * 
+     * @param requests The {@link WriteRequest}s which have been written on a closed session
+     * @param cause The original exception
+     */
     public WriteToClosedSessionException(Collection<WriteRequest> requests, Throwable cause) {
         super(requests, cause);
     }
 
+    /**
+     * Create a new WriteToClosedSessionException instance
+     * 
+     * @param requests The {@link WriteRequest}s which have been written on a closed session
+     */
     public WriteToClosedSessionException(Collection<WriteRequest> requests) {
         super(requests);
     }
 
+    /**
+     * Create a new WriteToClosedSessionException instance
+     * 
+     * @param request The {@link WriteRequest} which has been written on a closed session
+     * @param message The error message
+     * @param cause The original exception
+     */
     public WriteToClosedSessionException(WriteRequest request, String message, Throwable cause) {
         super(request, message, cause);
     }
 
-    public WriteToClosedSessionException(WriteRequest request, String s) {
-        super(request, s);
+    /**
+     * Create a new WriteToClosedSessionException instance
+     * 
+     * @param request The {@link WriteRequest} which has been written on a closed session
+     * @param message The error message
+     */
+    public WriteToClosedSessionException(WriteRequest request, String message) {
+        super(request, message);
     }
 
+    /**
+     * Create a new WriteToClosedSessionException instance
+     * 
+     * @param request The {@link WriteRequest} which has been written on a closed session
+     * @param cause The original exception
+     */
     public WriteToClosedSessionException(WriteRequest request, Throwable cause) {
         super(request, cause);
     }
 
+    /**
+     * Create a new WriteToClosedSessionException instance
+     * 
+     * @param request The {@link WriteRequest} which has been written on a closed session
+
+     */
     public WriteToClosedSessionException(WriteRequest request) {
         super(request);
     }


[2/4] mina git commit: o Added some mising Javadoc o Fixing some Sonarlint warnings

Posted by el...@apache.org.
o Added some mising Javadoc
o Fixing some Sonarlint warnings

Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/37239fd0
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/37239fd0
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/37239fd0

Branch: refs/heads/2.0
Commit: 37239fd01f0483e74e0bfd78abc5aca86dcb48c4
Parents: 9b26714
Author: Emmanuel L�charny <el...@symas.com>
Authored: Wed Dec 7 11:25:25 2016 +0100
Committer: Emmanuel L�charny <el...@symas.com>
Committed: Wed Dec 7 11:25:25 2016 +0100

----------------------------------------------------------------------
 .../socket/nio/NioDatagramAcceptor.java         | 26 ++++++---
 .../socket/nio/NioDatagramConnector.java        | 60 +++++++++++++++++++-
 .../socket/nio/NioDatagramSession.java          | 19 +++++++
 .../socket/nio/NioDatagramSessionConfig.java    | 12 +++-
 4 files changed, 105 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina/blob/37239fd0/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
index 1577ff2..d4c0000 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
@@ -81,11 +81,11 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
     private final Semaphore lock = new Semaphore(1);
 
     /** A queue used to store the list of pending Binds */
-    private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
+    private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<>();
 
-    private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
+    private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<>();
 
-    private final Queue<NioSession> flushingSessions = new ConcurrentLinkedQueue<NioSession>();
+    private final Queue<NioSession> flushingSessions = new ConcurrentLinkedQueue<>();
 
     private final Map<SocketAddress, DatagramChannel> boundHandles = Collections
             .synchronizedMap(new HashMap<SocketAddress, DatagramChannel>());
@@ -150,6 +150,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
      * the registered handles have been removed (unbound).
      */
     private class Acceptor implements Runnable {
+        @Override
         public void run() {
             int nHandles = 0;
             lastIdleCheckTime = System.currentTimeMillis();
@@ -220,7 +221,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
                 break;
             }
 
-            Map<SocketAddress, DatagramChannel> newHandles = new HashMap<SocketAddress, DatagramChannel>();
+            Map<SocketAddress, DatagramChannel> newHandles = new HashMap<>();
             List<SocketAddress> localAddresses = req.getLocalAddresses();
 
             try {
@@ -494,6 +495,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
     /**
      * {@inheritDoc}
      */
+    @Override
     public void add(NioSession session) {
         // Nothing to do for UDP
     }
@@ -538,7 +540,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
         // Update the local addresses.
         // setLocalAddresses() shouldn't be called from the worker thread
         // because of deadlock.
-        Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
+        Set<SocketAddress> newLocalAddresses = new HashSet<>();
 
         for (DatagramChannel handle : boundHandles.values()) {
             newLocalAddresses.add(localAddress(handle));
@@ -577,6 +579,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
     /**
      * {@inheritDoc}
      */
+    @Override
     public void flush(NioSession session) {
         if (scheduleFlush(session)) {
             wakeup();
@@ -596,14 +599,17 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
     /**
      * {@inheritDoc}
      */
+    @Override
     public DatagramSessionConfig getSessionConfig() {
         return (DatagramSessionConfig) sessionConfig;
     }
 
+    @Override
     public final IoSessionRecycler getSessionRecycler() {
         return sessionRecycler;
     }
 
+    @Override
     public TransportMetadata getTransportMetadata() {
         return NioDatagramSession.METADATA;
     }
@@ -665,6 +671,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
     /**
      * {@inheritDoc}
      */
+    @Override
     public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
         if (isDisposing()) {
             throw new IllegalStateException("The Acceptor is being disposed.");
@@ -681,9 +688,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
 
             try {
                 return newSessionWithoutLock(remoteAddress, localAddress);
-            } catch (RuntimeException e) {
-                throw e;
-            } catch (Error e) {
+            } catch (RuntimeException | Error e) {
                 throw e;
             } catch (Exception e) {
                 throw new RuntimeIoException("Failed to create a session.", e);
@@ -732,6 +737,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
     /**
      * {@inheritDoc}
      */
+    @Override
     public void remove(NioSession session) {
         getSessionRecycler().remove(session);
         getListeners().fireSessionDestroyed(session);
@@ -753,6 +759,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
         return ((DatagramChannel) session.getChannel()).send(buffer.buf(), remoteAddress);
     }
 
+    @Override
     public void setDefaultLocalAddress(InetSocketAddress localAddress) {
         setDefaultLocalAddress((SocketAddress) localAddress);
     }
@@ -775,6 +782,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
         key.interestOps(newInterestOps);
     }
 
+    @Override
     public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
         synchronized (bindLock) {
             if (isActive()) {
@@ -810,6 +818,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
     /**
      * {@inheritDoc}
      */
+    @Override
     public void updateTrafficControl(NioSession session) {
         throw new UnsupportedOperationException();
     }
@@ -821,6 +830,7 @@ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements Dat
     /**
      * {@inheritDoc}
      */
+    @Override
     public void write(NioSession session, WriteRequest writeRequest) {
         // We will try to write the message directly
         long currentTime = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/mina/blob/37239fd0/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
index 9da09de..c101448 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
@@ -93,35 +93,56 @@ DatagramConnector {
      * 
      * @param processorClass the processor class.
      * @see SimpleIoProcessorPool#SimpleIoProcessorPool(Class, Executor, int, java.nio.channels.spi.SelectorProvider)
-     * @see org.apache.mina.core.service.SimpleIoProcessorPool#DEFAULT_SIZE
      * @since 2.0.0-M4
      */
     public NioDatagramConnector(Class<? extends IoProcessor<NioSession>> processorClass) {
         super(new DefaultDatagramSessionConfig(), processorClass);
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public TransportMetadata getTransportMetadata() {
         return NioDatagramSession.METADATA;
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public DatagramSessionConfig getSessionConfig() {
         return (DatagramSessionConfig) sessionConfig;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public InetSocketAddress getDefaultRemoteAddress() {
         return (InetSocketAddress) super.getDefaultRemoteAddress();
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
         super.setDefaultRemoteAddress(defaultRemoteAddress);
     }
 
+    /**
+    @Override
+     * {@inheritDoc}
+     */
     @Override
     protected void init() throws Exception {
         // Do nothing
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     protected DatagramChannel newHandle(SocketAddress localAddress) throws Exception {
         DatagramChannel ch = DatagramChannel.open();
@@ -155,12 +176,18 @@ DatagramConnector {
         }
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     protected boolean connect(DatagramChannel handle, SocketAddress remoteAddress) throws Exception {
         handle.connect(remoteAddress);
         return true;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     protected NioSession newSession(IoProcessor<NioSession> processor, DatagramChannel handle) {
         NioSession session = new NioDatagramSession(this, handle, processor);
@@ -168,50 +195,77 @@ DatagramConnector {
         return session;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     protected void close(DatagramChannel handle) throws Exception {
         handle.disconnect();
         handle.close();
     }
 
+    /**
+     * {@inheritDoc}
+     */
     // Unused extension points.
     @Override
     @SuppressWarnings("unchecked")
     protected Iterator<DatagramChannel> allHandles() {
-        return Collections.EMPTY_LIST.iterator();
+        return Collections.emptyIterator();
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     protected ConnectionRequest getConnectionRequest(DatagramChannel handle) {
         throw new UnsupportedOperationException();
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     protected void destroy() throws Exception {
         // Do nothing
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     protected boolean finishConnect(DatagramChannel handle) throws Exception {
         throw new UnsupportedOperationException();
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     protected void register(DatagramChannel handle, ConnectionRequest request) throws Exception {
         throw new UnsupportedOperationException();
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     protected int select(int timeout) throws Exception {
         return 0;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     @SuppressWarnings("unchecked")
     protected Iterator<DatagramChannel> selectedHandles() {
-        return Collections.EMPTY_LIST.iterator();
+        return Collections.emptyIterator();
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     protected void wakeup() {
         // Do nothing

http://git-wip-us.apache.org/repos/asf/mina/blob/37239fd0/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java
index 9ca2ba3..180ddfc 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java
@@ -66,27 +66,46 @@ class NioDatagramSession extends NioSession {
     /**
      * {@inheritDoc}
      */
+    @Override
     public DatagramSessionConfig getConfig() {
         return (DatagramSessionConfig) config;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     DatagramChannel getChannel() {
         return (DatagramChannel) channel;
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public TransportMetadata getTransportMetadata() {
         return METADATA;
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public InetSocketAddress getRemoteAddress() {
         return remoteAddress;
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public InetSocketAddress getLocalAddress() {
         return localAddress;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public InetSocketAddress getServiceAddress() {
         return (InetSocketAddress) super.getServiceAddress();

http://git-wip-us.apache.org/repos/asf/mina/blob/37239fd0/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSessionConfig.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSessionConfig.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSessionConfig.java
index 552feaf..94df01e 100644
--- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSessionConfig.java
+++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSessionConfig.java
@@ -54,6 +54,7 @@ class NioDatagramSessionConfig extends AbstractDatagramSessionConfig {
      * 
      * @see DatagramSocket#getReceiveBufferSize()
      */
+    @Override
     public int getReceiveBufferSize() {
         try {
             return channel.socket().getReceiveBufferSize();
@@ -72,8 +73,9 @@ class NioDatagramSessionConfig extends AbstractDatagramSessionConfig {
      * @throws RuntimeIoException if the socket is closed or if we 
      * had a SocketException
      * 
-     * @see DatagramSocket#setReceiveBufferSize()
+     * @see DatagramSocket#setReceiveBufferSize(int)
      */
+    @Override
     public void setReceiveBufferSize(int receiveBufferSize) {
         try {
             channel.socket().setReceiveBufferSize(receiveBufferSize);
@@ -89,6 +91,7 @@ class NioDatagramSessionConfig extends AbstractDatagramSessionConfig {
      * @throws RuntimeIoException If the socket is closed or if we get an
      * {@link SocketException} 
      */
+    @Override
     public boolean isBroadcast() {
         try {
             return channel.socket().getBroadcast();
@@ -97,6 +100,7 @@ class NioDatagramSessionConfig extends AbstractDatagramSessionConfig {
         }
     }
 
+    @Override
     public void setBroadcast(boolean broadcast) {
         try {
             channel.socket().setBroadcast(broadcast);
@@ -110,6 +114,7 @@ class NioDatagramSessionConfig extends AbstractDatagramSessionConfig {
      * @throws RuntimeIoException If the socket is closed or if we get an
      * {@link SocketException} 
      */
+    @Override
     public int getSendBufferSize() {
         try {
             return channel.socket().getSendBufferSize();
@@ -123,6 +128,7 @@ class NioDatagramSessionConfig extends AbstractDatagramSessionConfig {
      * @throws RuntimeIoException If the socket is closed or if we get an
      * {@link SocketException} 
      */
+    @Override
     public void setSendBufferSize(int sendBufferSize) {
         try {
             channel.socket().setSendBufferSize(sendBufferSize);
@@ -138,6 +144,7 @@ class NioDatagramSessionConfig extends AbstractDatagramSessionConfig {
      * @throws RuntimeIoException If the socket is closed or if we get an
      * {@link SocketException} 
      */
+    @Override
     public boolean isReuseAddress() {
         try {
             return channel.socket().getReuseAddress();
@@ -151,6 +158,7 @@ class NioDatagramSessionConfig extends AbstractDatagramSessionConfig {
      * @throws RuntimeIoException If the socket is closed or if we get an
      * {@link SocketException} 
      */
+    @Override
     public void setReuseAddress(boolean reuseAddress) {
         try {
             channel.socket().setReuseAddress(reuseAddress);
@@ -168,6 +176,7 @@ class NioDatagramSessionConfig extends AbstractDatagramSessionConfig {
      * @throws RuntimeIoException If the socket is closed or if we get an
      * {@link SocketException} 
      */
+    @Override
     public int getTrafficClass() {
         try {
             return channel.socket().getTrafficClass();
@@ -181,6 +190,7 @@ class NioDatagramSessionConfig extends AbstractDatagramSessionConfig {
      * @throws RuntimeIoException If the socket is closed or if we get an
      * {@link SocketException} 
      */
+    @Override
     public void setTrafficClass(int trafficClass) {
         try {
             channel.socket().setTrafficClass(trafficClass);