You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2016/08/19 16:28:17 UTC
[1/2] mina git commit: o Created a Static MESSAGE_SENT_REQUEST which
is used to mark the end of messages in the codec filter. It avoids the
creation of a WriteRequest for every message being sent. o Use this
MESSAGE_SENT_REQUEST in the ProtocolCodecFilte
Repository: mina
Updated Branches:
refs/heads/2.0 12d45d143 -> 109381c94
o Created a Static MESSAGE_SENT_REQUEST which is used to mark the end of
messages in the codec filter. It avoids the creation of a WriteRequest
for every message being sent.
o Use this MESSAGE_SENT_REQUEST in the ProtocolCodecFilter, when we have
encoded a full message
o Changed the way we process the write loop by checking for the presence
if this MESSAGE_SENT_REQUEST instead of using the number of written
bytes.
Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/a4a481d2
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/a4a481d2
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/a4a481d2
Branch: refs/heads/2.0
Commit: a4a481d25963e925a7b94123a83416e3321ecb90
Parents: 12d45d1
Author: Emmanuel L�charny <el...@symas.com>
Authored: Fri Aug 19 17:13:59 2016 +0200
Committer: Emmanuel L�charny <el...@symas.com>
Committed: Fri Aug 19 17:13:59 2016 +0200
----------------------------------------------------------------------
.../mina/core/polling/AbstractPollingIoProcessor.java | 13 +++----------
.../apache/mina/core/session/AbstractIoSession.java | 7 +++++++
.../apache/mina/filter/codec/ProtocolCodecFilter.java | 5 ++---
3 files changed, 12 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina/blob/a4a481d2/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
index 5d23b35..c1295f6 100644
--- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
+++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
@@ -814,9 +814,6 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
+ (session.getConfig().getMaxReadBufferSize() >>> 1);
int writtenBytes = 0;
WriteRequest req = null;
-
- // boolean to indicate if the current message is an empty buffer, representing a message marker
- boolean isEmptyMessage = false;
try {
// Clear OP_WRITE
@@ -840,7 +837,6 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
Object message = req.getMessage();
if (message instanceof IoBuffer) {
- isEmptyMessage = !((IoBuffer) message).hasRemaining();
localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
currentTime);
@@ -870,14 +866,11 @@ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> im
}
if (localWrittenBytes == 0) {
- if (isEmptyMessage) {
- // Kernel buffer is full.
+
+ // Kernel buffer is full.
+ if (!req.equals(AbstractIoSession.MESSAGE_SENT_REQUEST)) {
setInterestedInWrite(session, true);
return false;
- } else {
- // Just processed a message marker - empty buffer;
- // set the session write flag and continue
- setInterestedInWrite(session, true);
}
} else {
writtenBytes += localWrittenBytes;
http://git-wip-us.apache.org/repos/asf/mina/blob/a4a481d2/mina-core/src/main/java/org/apache/mina/core/session/AbstractIoSession.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/session/AbstractIoSession.java b/mina-core/src/main/java/org/apache/mina/core/session/AbstractIoSession.java
index 316d978..842859f 100644
--- a/mina-core/src/main/java/org/apache/mina/core/session/AbstractIoSession.java
+++ b/mina-core/src/main/java/org/apache/mina/core/session/AbstractIoSession.java
@@ -97,6 +97,13 @@ public abstract class AbstractIoSession implements IoSession {
*/
public static final WriteRequest CLOSE_REQUEST = new DefaultWriteRequest(new Object());
+ /**
+ * An internal write request object that triggers message sent events.
+ *
+ * @see #writeRequestQueue
+ */
+ public static final WriteRequest MESSAGE_SENT_REQUEST = new DefaultWriteRequest(DefaultWriteRequest.EMPTY_MESSAGE);
+
private final Object lock = new Object();
private IoSessionAttributeMap attributes;
http://git-wip-us.apache.org/repos/asf/mina/blob/a4a481d2/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
index 18b2ec4..1f47928 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
@@ -29,6 +29,7 @@ import org.apache.mina.core.filterchain.IoFilterAdapter;
import org.apache.mina.core.filterchain.IoFilterChain;
import org.apache.mina.core.future.DefaultWriteFuture;
import org.apache.mina.core.future.WriteFuture;
+import org.apache.mina.core.session.AbstractIoSession;
import org.apache.mina.core.session.AttributeKey;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.write.DefaultWriteRequest;
@@ -435,9 +436,7 @@ public class ProtocolCodecFilter extends IoFilterAdapter {
if (future == null) {
// Creates an empty writeRequest containing the destination
- WriteRequest writeRequest = new DefaultWriteRequest(
- DefaultWriteRequest.EMPTY_MESSAGE, null, destination);
- future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(writeRequest));
+ future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(AbstractIoSession.MESSAGE_SENT_REQUEST));
}
return future;
[2/2] mina git commit: Fixed some Sonar warnings
Posted by el...@apache.org.
Fixed some Sonar warnings
Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/109381c9
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/109381c9
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/109381c9
Branch: refs/heads/2.0
Commit: 109381c94271eb1f68e00da0d88346b961b46e95
Parents: a4a481d
Author: Emmanuel L�charny <el...@symas.com>
Authored: Fri Aug 19 17:24:07 2016 +0200
Committer: Emmanuel L�charny <el...@symas.com>
Committed: Fri Aug 19 17:24:07 2016 +0200
----------------------------------------------------------------------
.../core/polling/AbstractPollingIoAcceptor.java | 15 ++++---
.../mina/core/service/AbstractIoAcceptor.java | 47 ++++++++++++++------
2 files changed, 42 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina/blob/109381c9/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
index f9fad07..ee30983 100644
--- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
+++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
@@ -76,9 +76,9 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H>
private final boolean createdProcessor;
- private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
+ private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<>();
- private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
+ private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<>();
private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());
@@ -88,7 +88,7 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H>
private volatile boolean selectable;
/** The thread responsible of accepting incoming requests */
- private AtomicReference<Acceptor> acceptorRef = new AtomicReference<Acceptor>();
+ private AtomicReference<Acceptor> acceptorRef = new AtomicReference<>();
protected boolean reuseAddress = false;
@@ -371,7 +371,7 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H>
// Update the local addresses.
// setLocalAddresses() shouldn't be called from the worker thread
// because of deadlock.
- Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
+ Set<SocketAddress> newLocalAddresses = new HashSet<>();
for (H handle : boundHandles.values()) {
newLocalAddresses.add(localAddress(handle));
@@ -577,7 +577,7 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H>
// We create a temporary map to store the bound handles,
// as we may have to remove them all if there is an exception
// during the sockets opening.
- Map<SocketAddress, H> newHandles = new ConcurrentHashMap<SocketAddress, H>();
+ Map<SocketAddress, H> newHandles = new ConcurrentHashMap<>();
List<SocketAddress> localAddresses = future.getLocalAddresses();
try {
@@ -609,7 +609,8 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H>
}
}
- // TODO : add some comment : what is the wakeup() waking up ?
+ // Wake up the selector to be sure we will process the newly bound handle
+ // and not block forever in the select()
wakeup();
}
}
@@ -657,6 +658,7 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H>
/**
* {@inheritDoc}
*/
+ @Override
public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
throw new UnsupportedOperationException();
}
@@ -710,6 +712,7 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H>
/**
* {@inheritDoc}
*/
+ @Override
public SocketSessionConfig getSessionConfig() {
return (SocketSessionConfig)sessionConfig;
}
http://git-wip-us.apache.org/repos/asf/mina/blob/109381c9/mina-core/src/main/java/org/apache/mina/core/service/AbstractIoAcceptor.java
----------------------------------------------------------------------
diff --git a/mina-core/src/main/java/org/apache/mina/core/service/AbstractIoAcceptor.java b/mina-core/src/main/java/org/apache/mina/core/service/AbstractIoAcceptor.java
index 18492ef..867fb1d 100644
--- a/mina-core/src/main/java/org/apache/mina/core/service/AbstractIoAcceptor.java
+++ b/mina-core/src/main/java/org/apache/mina/core/service/AbstractIoAcceptor.java
@@ -42,12 +42,12 @@ import org.apache.mina.core.session.IoSessionConfig;
*/
public abstract class AbstractIoAcceptor extends AbstractIoService implements IoAcceptor {
- private final List<SocketAddress> defaultLocalAddresses = new ArrayList<SocketAddress>();
+ private final List<SocketAddress> defaultLocalAddresses = new ArrayList<>();
private final List<SocketAddress> unmodifiableDefaultLocalAddresses = Collections
.unmodifiableList(defaultLocalAddresses);
- private final Set<SocketAddress> boundAddresses = new HashSet<SocketAddress>();
+ private final Set<SocketAddress> boundAddresses = new HashSet<>();
private boolean disconnectOnUnbind = true;
@@ -80,6 +80,7 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
/**
* {@inheritDoc}
*/
+ @Override
public SocketAddress getLocalAddress() {
Set<SocketAddress> localAddresses = getLocalAddresses();
if (localAddresses.isEmpty()) {
@@ -92,8 +93,9 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
/**
* {@inheritDoc}
*/
+ @Override
public final Set<SocketAddress> getLocalAddresses() {
- Set<SocketAddress> localAddresses = new HashSet<SocketAddress>();
+ Set<SocketAddress> localAddresses = new HashSet<>();
synchronized (boundAddresses) {
localAddresses.addAll(boundAddresses);
@@ -105,6 +107,7 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
/**
* {@inheritDoc}
*/
+ @Override
public SocketAddress getDefaultLocalAddress() {
if (defaultLocalAddresses.isEmpty()) {
return null;
@@ -115,6 +118,7 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
/**
* {@inheritDoc}
*/
+ @Override
public final void setDefaultLocalAddress(SocketAddress localAddress) {
setDefaultLocalAddresses(localAddress);
}
@@ -122,6 +126,7 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
/**
* {@inheritDoc}
*/
+ @Override
public final List<SocketAddress> getDefaultLocalAddresses() {
return unmodifiableDefaultLocalAddresses;
}
@@ -130,6 +135,7 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
* {@inheritDoc}
* @org.apache.xbean.Property nestedType="java.net.SocketAddress"
*/
+ @Override
public final void setDefaultLocalAddresses(List<? extends SocketAddress> localAddresses) {
if (localAddresses == null) {
throw new IllegalArgumentException("localAddresses");
@@ -140,6 +146,7 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
/**
* {@inheritDoc}
*/
+ @Override
public final void setDefaultLocalAddresses(Iterable<? extends SocketAddress> localAddresses) {
if (localAddresses == null) {
throw new IllegalArgumentException("localAddresses");
@@ -151,7 +158,7 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
throw new IllegalStateException("localAddress can't be set while the acceptor is bound.");
}
- Collection<SocketAddress> newLocalAddresses = new ArrayList<SocketAddress>();
+ Collection<SocketAddress> newLocalAddresses = new ArrayList<>();
for (SocketAddress a : localAddresses) {
checkAddressType(a);
@@ -172,12 +179,13 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
* {@inheritDoc}
* @org.apache.xbean.Property nestedType="java.net.SocketAddress"
*/
+ @Override
public final void setDefaultLocalAddresses(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses) {
if (otherLocalAddresses == null) {
otherLocalAddresses = new SocketAddress[0];
}
- Collection<SocketAddress> newLocalAddresses = new ArrayList<SocketAddress>(otherLocalAddresses.length + 1);
+ Collection<SocketAddress> newLocalAddresses = new ArrayList<>(otherLocalAddresses.length + 1);
newLocalAddresses.add(firstLocalAddress);
for (SocketAddress a : otherLocalAddresses) {
@@ -190,6 +198,7 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
/**
* {@inheritDoc}
*/
+ @Override
public final boolean isCloseOnDeactivation() {
return disconnectOnUnbind;
}
@@ -197,6 +206,7 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
/**
* {@inheritDoc}
*/
+ @Override
public final void setCloseOnDeactivation(boolean disconnectClientsOnUnbind) {
this.disconnectOnUnbind = disconnectClientsOnUnbind;
}
@@ -204,6 +214,7 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
/**
* {@inheritDoc}
*/
+ @Override
public final void bind() throws IOException {
bind(getDefaultLocalAddresses());
}
@@ -211,12 +222,13 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
/**
* {@inheritDoc}
*/
+ @Override
public final void bind(SocketAddress localAddress) throws IOException {
if (localAddress == null) {
throw new IllegalArgumentException("localAddress");
}
- List<SocketAddress> localAddresses = new ArrayList<SocketAddress>(1);
+ List<SocketAddress> localAddresses = new ArrayList<>(1);
localAddresses.add(localAddress);
bind(localAddresses);
}
@@ -224,13 +236,14 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
/**
* {@inheritDoc}
*/
+ @Override
public final void bind(SocketAddress... addresses) throws IOException {
if ((addresses == null) || (addresses.length == 0)) {
bind(getDefaultLocalAddresses());
return;
}
- List<SocketAddress> localAddresses = new ArrayList<SocketAddress>(2);
+ List<SocketAddress> localAddresses = new ArrayList<>(2);
for (SocketAddress address : addresses) {
localAddresses.add(address);
@@ -242,6 +255,7 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
/**
* {@inheritDoc}
*/
+ @Override
public final void bind(SocketAddress firstLocalAddress, SocketAddress... addresses) throws IOException {
if (firstLocalAddress == null) {
bind(getDefaultLocalAddresses());
@@ -252,7 +266,7 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
return;
}
- List<SocketAddress> localAddresses = new ArrayList<SocketAddress>(2);
+ List<SocketAddress> localAddresses = new ArrayList<>(2);
localAddresses.add(firstLocalAddress);
for (SocketAddress address : addresses) {
@@ -265,7 +279,8 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
/**
* {@inheritDoc}
*/
- public final void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException {
+ @Override
+public final void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException {
if (isDisposing()) {
throw new IllegalStateException("The Accpetor disposed is being disposed.");
}
@@ -274,7 +289,7 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
throw new IllegalArgumentException("localAddresses");
}
- List<SocketAddress> localAddressesCopy = new ArrayList<SocketAddress>();
+ List<SocketAddress> localAddressesCopy = new ArrayList<>();
for (SocketAddress a : localAddresses) {
checkAddressType(a);
@@ -320,6 +335,7 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
/**
* {@inheritDoc}
*/
+ @Override
public final void unbind() {
unbind(getLocalAddresses());
}
@@ -327,12 +343,13 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
/**
* {@inheritDoc}
*/
+ @Override
public final void unbind(SocketAddress localAddress) {
if (localAddress == null) {
throw new IllegalArgumentException("localAddress");
}
- List<SocketAddress> localAddresses = new ArrayList<SocketAddress>(1);
+ List<SocketAddress> localAddresses = new ArrayList<>(1);
localAddresses.add(localAddress);
unbind(localAddresses);
}
@@ -340,6 +357,7 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
/**
* {@inheritDoc}
*/
+ @Override
public final void unbind(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses) {
if (firstLocalAddress == null) {
throw new IllegalArgumentException("firstLocalAddress");
@@ -348,7 +366,7 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
throw new IllegalArgumentException("otherLocalAddresses");
}
- List<SocketAddress> localAddresses = new ArrayList<SocketAddress>();
+ List<SocketAddress> localAddresses = new ArrayList<>();
localAddresses.add(firstLocalAddress);
Collections.addAll(localAddresses, otherLocalAddresses);
unbind(localAddresses);
@@ -357,6 +375,7 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
/**
* {@inheritDoc}
*/
+ @Override
public final void unbind(Iterable<? extends SocketAddress> localAddresses) {
if (localAddresses == null) {
throw new IllegalArgumentException("localAddresses");
@@ -369,7 +388,7 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
return;
}
- List<SocketAddress> localAddressesCopy = new ArrayList<SocketAddress>();
+ List<SocketAddress> localAddressesCopy = new ArrayList<>();
int specifiedAddressCount = 0;
for (SocketAddress a : localAddresses) {
@@ -447,7 +466,7 @@ public abstract class AbstractIoAcceptor extends AbstractIoService implements Io
private final List<SocketAddress> localAddresses;
public AcceptorOperationFuture(List<? extends SocketAddress> localAddresses) {
- this.localAddresses = new ArrayList<SocketAddress>(localAddresses);
+ this.localAddresses = new ArrayList<>(localAddresses);
}
public final List<SocketAddress> getLocalAddresses() {