You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/09/30 12:51:54 UTC
svn commit: r580694 - in /mina/trunk/core/src/main/java/org/apache/mina:
common/ filter/codec/ transport/socket/nio/ transport/vmpipe/
Author: trustin
Date: Sun Sep 30 03:51:52 2007
New Revision: 580694
URL: http://svn.apache.org/viewvc?rev=580694&view=rev
Log:
* Removed unused classes implemented poorly
* Added IoService.isActive() and replaced IoAcceptor.isBound() with it
* Added IoServiceListenerSupport.isActive(); AbstractIoConnector uses it
* Moved all calls to IoSession.increaseXXX() to DefaultIoFilterChain
** Removed duplicate calls in NioDatagramAcceptor
** Moved IoSession.increaseReadMessages() call from ProtocolCodecFilter to DefaultIoFilterChain because it's more accurate when more than one ProtocolCodecFilter is used.
* Changed all unsafe operations in AbstractIoSession and AbstractIoService 'private'.
* Resolved DIRMINA-317 (add supporting statistical methods to IoService)
** Added getActivationTime() - activationTime is more useful than creationTime
** Added getReadBytes()
** Added getWrittenBytes()
** Added getReadMessages()
** Added getWrittenMessages()
** Added getScheduledWriteBytes()
** Added getScheduledWriteMessages()
Removed:
mina/trunk/core/src/main/java/org/apache/mina/common/IoAcceptorWrapper.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoConnectorWrapper.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoService.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java
mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoAcceptor.java Sun Sep 30 03:51:52 2007
@@ -125,7 +125,7 @@
getListeners().fireServiceDeactivated();
}
- public boolean isBound() {
+ public boolean isActive() {
synchronized (bindLock) {
return bound;
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java Sun Sep 30 03:51:52 2007
@@ -20,6 +20,7 @@
package org.apache.mina.common;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
/**
@@ -43,6 +44,14 @@
* Maintains the {@link IoServiceListener}s of this service.
*/
private final IoServiceListenerSupport listeners;
+
+ private volatile long activationTime;
+ private final AtomicLong readBytes = new AtomicLong();
+ private final AtomicLong writtenBytes = new AtomicLong();
+ private final AtomicLong readMessages = new AtomicLong();
+ private final AtomicLong writtenMessages = new AtomicLong();
+ private final AtomicLong scheduledWriteBytes = new AtomicLong();
+ private final AtomicLong scheduledWriteMessages = new AtomicLong();
/**
* The default {@link IoSessionConfig} which will be used to configure new sessions.
@@ -93,6 +102,10 @@
getListeners().remove(listener);
}
+ public boolean isActive() {
+ return getListeners().isActive();
+ }
+
public Set<IoSession> getManagedSessions() {
return getListeners().getManagedSessions();
}
@@ -117,6 +130,64 @@
return sessionConfig;
}
+ public long getReadBytes() {
+ return readBytes.get();
+ }
+
+ protected void increaseReadBytes(long increment) {
+ readBytes.addAndGet(increment);
+ }
+
+ public long getReadMessages() {
+ return readMessages.get();
+ }
+
+ protected void increaseReadMessages() {
+ readMessages.incrementAndGet();
+ }
+
+ public long getScheduledWriteBytes() {
+ return scheduledWriteBytes.get();
+ }
+
+ protected void increaseScheduledWriteBytes(long increment) {
+ scheduledWriteBytes.addAndGet(increment);
+ }
+
+ public long getScheduledWriteMessages() {
+ return scheduledWriteMessages.get();
+ }
+
+ protected void increaseScheduledWriteMessages() {
+ scheduledWriteMessages.incrementAndGet();
+ }
+
+ public long getActivationTime() {
+ return activationTime;
+ }
+
+ protected void setActivationTime(long activationTime) {
+ this.activationTime = activationTime;
+ }
+
+ public long getWrittenBytes() {
+ return writtenBytes.get();
+ }
+
+ protected void increaseWrittenBytes(long increment) {
+ writtenBytes.addAndGet(increment);
+ scheduledWriteBytes.addAndGet(-increment);
+ }
+
+ public long getWrittenMessages() {
+ return writtenMessages.get();
+ }
+
+ protected void increaseWrittenMessages() {
+ writtenMessages.incrementAndGet();
+ scheduledWriteMessages.decrementAndGet();
+ }
+
protected static class ServiceOperationFuture extends DefaultIoFuture {
public ServiceOperationFuture() {
super(null);
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java Sun Sep 30 03:51:52 2007
@@ -124,11 +124,11 @@
return closeFuture;
}
- public boolean isScheduledForFlush() {
+ protected boolean isScheduledForFlush() {
return scheduledForFlush.get();
}
- public boolean setScheduledForFlush(boolean flag) {
+ protected boolean setScheduledForFlush(boolean flag) {
if (flag) {
return scheduledForFlush.compareAndSet(false, true);
} else {
@@ -388,16 +388,20 @@
return scheduledWriteMessages.get();
}
- public void increaseReadBytes(int increment) {
+ protected void increaseReadBytes(long increment) {
if (increment > 0) {
readBytes += increment;
lastReadTime = System.currentTimeMillis();
idleCountForBoth = 0;
idleCountForRead = 0;
+
+ if (getService() instanceof AbstractIoService) {
+ ((AbstractIoService) getService()).increaseReadBytes(increment);
+ }
}
}
- public void increaseWrittenBytes(long increment) {
+ protected void increaseWrittenBytes(long increment) {
if (increment > 0) {
writtenBytes += increment;
lastWriteTime = System.currentTimeMillis();
@@ -405,27 +409,43 @@
idleCountForWrite = 0;
scheduledWriteBytes.addAndGet(-increment);
+
+ if (getService() instanceof AbstractIoService) {
+ ((AbstractIoService) getService()).increaseWrittenBytes(increment);
+ }
}
}
- public void increaseReadMessages() {
+ protected void increaseReadMessages() {
readMessages++;
+ if (getService() instanceof AbstractIoService) {
+ ((AbstractIoService) getService()).increaseReadMessages();
+ }
}
- public void increaseWrittenMessages() {
+ protected void increaseWrittenMessages() {
writtenMessages++;
scheduledWriteMessages.decrementAndGet();
+ if (getService() instanceof AbstractIoService) {
+ ((AbstractIoService) getService()).increaseWrittenMessages();
+ }
}
- public void increaseScheduledWriteBytes(int increment) {
+ protected void increaseScheduledWriteBytes(long increment) {
scheduledWriteBytes.addAndGet(increment);
+ if (getService() instanceof AbstractIoService) {
+ ((AbstractIoService) getService()).increaseScheduledWriteBytes(increment);
+ }
}
- public void increaseScheduledWriteMessages() {
+ protected void increaseScheduledWriteMessages() {
scheduledWriteMessages.incrementAndGet();
+ if (getService() instanceof AbstractIoService) {
+ ((AbstractIoService) getService()).increaseScheduledWriteMessages();
+ }
}
- public Queue<WriteRequest> getWriteRequestQueue() {
+ protected Queue<WriteRequest> getWriteRequestQueue() {
return writeRequestQueue;
}
@@ -493,7 +513,7 @@
throw new IllegalArgumentException("Unknown idle status: " + status);
}
- public void increaseIdleCount(IdleStatus status) {
+ protected void increaseIdleCount(IdleStatus status) {
if (status == IdleStatus.BOTH_IDLE) {
idleCountForBoth++;
lastIdleTimeForBoth = System.currentTimeMillis();
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java Sun Sep 30 03:51:52 2007
@@ -656,6 +656,7 @@
@Override
public void messageReceived(NextFilter nextFilter, IoSession session,
Object message) throws Exception {
+ ((AbstractIoSession) session).increaseReadMessages();
session.getHandler().messageReceived(session, message);
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoAcceptor.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoAcceptor.java Sun Sep 30 03:51:52 2007
@@ -80,11 +80,6 @@
void unbind();
/**
- * Returns <tt>true</tt> if and if only this service is bound to the local address.
- */
- boolean isBound();
-
- /**
* (Optional) Returns an {@link IoSession} that is bound to the current
* local address and the specified <tt>remoteAddress</tt> which reuses
* the local address that is already bound by this service.
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoService.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoService.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoService.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoService.java Sun Sep 30 03:51:52 2007
@@ -101,4 +101,21 @@
* not a {@link DefaultIoFilterChainBuilder}
*/
DefaultIoFilterChainBuilder getFilterChain();
+
+ boolean isActive();
+
+ long getActivationTime();
+
+ long getReadBytes();
+
+ long getWrittenBytes();
+
+ long getReadMessages();
+
+ long getWrittenMessages();
+
+ long getScheduledWriteBytes();
+
+ long getScheduledWriteMessages();
+
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java Sun Sep 30 03:51:52 2007
@@ -84,6 +84,10 @@
public Set<IoSession> getManagedSessions() {
return readOnlyManagedSessions;
}
+
+ public boolean isActive() {
+ return activated.get();
+ }
/**
* Calls {@link IoServiceListener#serviceActivated(IoService)}
@@ -93,6 +97,10 @@
if (!activated.compareAndSet(false, true)) {
return;
}
+
+ if (service instanceof AbstractIoService) {
+ ((AbstractIoService) service).setActivationTime(System.currentTimeMillis());
+ }
for (IoServiceListener l : listeners) {
l.serviceActivated(service);
@@ -114,6 +122,9 @@
}
} finally {
disconnectSessions();
+ if (service instanceof AbstractIoService) {
+ ((AbstractIoService) service).setActivationTime(0);
+ }
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java Sun Sep 30 03:51:52 2007
@@ -22,7 +22,6 @@
import java.net.SocketAddress;
import java.util.Queue;
-import org.apache.mina.common.AbstractIoSession;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.DefaultWriteFuture;
import org.apache.mina.common.DefaultWriteRequest;
@@ -342,14 +341,6 @@
IoSession session, NextFilter nextFilter) {
this.session = session;
this.nextFilter = nextFilter;
- }
-
- @Override
- public void write(Object message) {
- super.write(message);
- if (session instanceof AbstractIoSession) {
- ((AbstractIoSession) session).increaseReadMessages();
- }
}
public void flush() {
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java Sun Sep 30 03:51:52 2007
@@ -163,7 +163,7 @@
}
synchronized (bindLock) {
- if (!isBound()) {
+ if (!isActive()) {
throw new IllegalStateException(
"Can't create a session from a unbound service.");
}
@@ -210,7 +210,7 @@
public void setSessionRecycler(IoSessionRecycler sessionRecycler) {
synchronized (bindLock) {
- if (isBound()) {
+ if (isActive()) {
throw new IllegalStateException(
"sessionRecycler can't be set while the acceptor is bound.");
}
@@ -349,7 +349,6 @@
newBuf.put(readBuf);
newBuf.flip();
- session.increaseReadBytes(newBuf.remaining());
session.getFilterChain().fireMessageReceived(newBuf);
}
}
@@ -391,46 +390,40 @@
int writtenBytes = 0;
int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
- try {
- for (; ;) {
- WriteRequest req = writeRequestQueue.peek();
- if (req == null) {
- break;
- }
+ for (; ;) {
+ WriteRequest req = writeRequestQueue.peek();
+ if (req == null) {
+ break;
+ }
- ByteBuffer buf = (ByteBuffer) req.getMessage();
- if (buf.remaining() == 0) {
- // pop and fire event
- writeRequestQueue.poll();
- session.increaseWrittenMessages();
- buf.reset();
- session.getFilterChain().fireMessageSent(req);
- continue;
- }
+ ByteBuffer buf = (ByteBuffer) req.getMessage();
+ if (buf.remaining() == 0) {
+ // pop and fire event
+ writeRequestQueue.poll();
+ buf.reset();
+ session.getFilterChain().fireMessageSent(req);
+ continue;
+ }
- SocketAddress destination = req.getDestination();
- if (destination == null) {
- destination = session.getRemoteAddress();
- }
+ SocketAddress destination = req.getDestination();
+ if (destination == null) {
+ destination = session.getRemoteAddress();
+ }
- int localWrittenBytes = ch.send(buf.buf(), destination);
- if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
- // Kernel buffer is full or wrote too much
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- return false;
- } else {
- key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-
- // pop and fire event
- writeRequestQueue.poll();
- writtenBytes += localWrittenBytes;
- session.increaseWrittenMessages();
- buf.reset();
- session.getFilterChain().fireMessageSent(req);
- }
+ int localWrittenBytes = ch.send(buf.buf(), destination);
+ if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
+ // Kernel buffer is full or wrote too much
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ return false;
+ } else {
+ key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+
+ // pop and fire event
+ writeRequestQueue.poll();
+ writtenBytes += localWrittenBytes;
+ buf.reset();
+ session.getFilterChain().fireMessageSent(req);
}
- } finally {
- session.increaseWrittenBytes(writtenBytes);
}
return true;
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java Sun Sep 30 03:51:52 2007
@@ -24,6 +24,7 @@
import java.net.SocketException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
+import java.util.Queue;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.DefaultIoFilterChain;
@@ -35,6 +36,7 @@
import org.apache.mina.common.IoSession;
import org.apache.mina.common.RuntimeIoException;
import org.apache.mina.common.TransportMetadata;
+import org.apache.mina.common.WriteRequest;
import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
import org.apache.mina.transport.socket.DatagramSession;
import org.apache.mina.transport.socket.DatagramSessionConfig;
@@ -148,6 +150,21 @@
@Override
public InetSocketAddress getServiceAddress() {
return (InetSocketAddress) super.getServiceAddress();
+ }
+
+ @Override
+ protected Queue<WriteRequest> getWriteRequestQueue() {
+ return super.getWriteRequestQueue();
+ }
+
+ @Override
+ protected boolean isScheduledForFlush() {
+ return super.isScheduledForFlush();
+ }
+
+ @Override
+ protected boolean setScheduledForFlush(boolean flag) {
+ return super.setScheduledForFlush(flag);
}
private class SessionConfigImpl extends AbstractDatagramSessionConfig {
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java Sun Sep 30 03:51:52 2007
@@ -194,7 +194,7 @@
public void setReuseAddress(boolean reuseAddress) {
synchronized (bindLock) {
- if (isBound()) {
+ if (isActive()) {
throw new IllegalStateException(
"backlog can't be set while the acceptor is bound.");
}
@@ -209,7 +209,7 @@
public void setBacklog(int backlog) {
synchronized (bindLock) {
- if (isBound()) {
+ if (isActive()) {
throw new IllegalStateException(
"backlog can't be set while the acceptor is bound.");
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java?rev=580694&r1=580693&r2=580694&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java Sun Sep 30 03:51:52 2007
@@ -19,6 +19,7 @@
*/
package org.apache.mina.transport.vmpipe;
+import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
@@ -33,6 +34,7 @@
import org.apache.mina.common.IoServiceListenerSupport;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.TransportMetadata;
+import org.apache.mina.common.WriteRequest;
/**
* A {@link IoSession} for in-VM transport (VM_PIPE).
@@ -148,6 +150,10 @@
@Override
public VmPipeAddress getServiceAddress() {
return serviceAddress;
+ }
+
+ protected Queue<WriteRequest> getWriteRequestQueue() {
+ return super.getWriteRequestQueue();
}
Lock getLock() {