You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2008/05/13 04:56:28 UTC
svn commit: r655717 - in /mina:
sandbox/filter-traffic/src/test/java/org/apache/mina/filter/traffic/
sandbox/native/ trunk/core/src/main/java/org/apache/mina/common/
trunk/core/src/test/java/org/apache/mina/common/
trunk/core/src/test/java/org/apache/m...
Author: trustin
Date: Mon May 12 19:56:27 2008
New Revision: 655717
URL: http://svn.apache.org/viewvc?rev=655717&view=rev
Log:
Resolved DIMINA-582 (IoService.getManagedSession() should return a Map instead of a Set.)
* Replaced ConcurrentHashSet<IoSession> in IoServiceListenerSupport with ConcurrentHashMap<Long, IoSession>
* Fixed related compilation errors in various modules and abstract classes
Modified:
mina/sandbox/filter-traffic/src/test/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
mina/sandbox/native/pom.xml
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoService.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java
mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java
mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractBindTest.java
mina/trunk/integration-jmx/src/main/java/org/apache/mina/integration/jmx/IoServiceMBean.java
Modified: mina/sandbox/filter-traffic/src/test/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java
URL: http://svn.apache.org/viewvc/mina/sandbox/filter-traffic/src/test/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java?rev=655717&r1=655716&r2=655717&view=diff
==============================================================================
--- mina/sandbox/filter-traffic/src/test/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java (original)
+++ mina/sandbox/filter-traffic/src/test/java/org/apache/mina/filter/traffic/ReadThrottleFilter.java Mon May 12 19:56:27 2008
@@ -64,11 +64,11 @@
* @version $Rev$, $Date$
*/
public class ReadThrottleFilter extends IoFilterAdapter {
-
+
private static final AtomicInteger globalBufferSize = new AtomicInteger();
private static final Map<IoService, AtomicInteger> serviceBufferSizes =
new CopyOnWriteMap<IoService, AtomicInteger>();
-
+
private static final Object globalResumeLock = new Object();
private static long lastGlobalResumeTime = 0;
private final Logger logger = LoggerFactory.getLogger(getClass());
@@ -81,7 +81,7 @@
public static int getGlobalBufferSize() {
return globalBufferSize.get();
}
-
+
public static int getServiceBufferSize(IoService service) {
AtomicInteger answer = serviceBufferSizes.get(service);
if (answer == null) {
@@ -90,7 +90,7 @@
return answer.get();
}
}
-
+
private static int increaseServiceBufferSize(IoService service, int increment) {
AtomicInteger serviceBufferSize = serviceBufferSizes.get(service);
if (serviceBufferSize == null) {
@@ -105,19 +105,19 @@
}
return serviceBufferSize.addAndGet(increment);
}
-
+
private final AttributeKey STATE =
new AttributeKey(ReadThrottleFilter.class, "state");
private volatile ReadThrottlePolicy policy;
private final MessageSizeEstimator messageSizeEstimator;
-
+
private volatile int maxSessionBufferSize;
private volatile int maxServiceBufferSize;
private volatile int maxGlobalBufferSize;
-
+
private final IoFilter enterFilter = new EnterFilter();
-
+
private final ScheduledExecutorService executor;
private ScheduledFuture<?> resumeOthersFuture;
private final AtomicInteger sessionCount = new AtomicInteger();
@@ -134,19 +134,19 @@
public ReadThrottleFilter(ScheduledExecutorService executor) {
this(executor, ReadThrottlePolicy.LOG);
}
-
+
public ReadThrottleFilter(
ScheduledExecutorService executor, ReadThrottlePolicy policy) {
this(executor, policy, null);
}
-
+
public ReadThrottleFilter(
ScheduledExecutorService executor,
ReadThrottlePolicy policy, MessageSizeEstimator messageSizeEstimator) {
// 64KB, 64MB, 128MB.
this(executor, policy, messageSizeEstimator, 65536, 1048576 * 64, 1048576 * 128);
}
-
+
/**
* Creates a new instance with the specified <tt>maxSessionBufferSize</tt>,
* <tt>maxGlobalBufferSize</tt> and a new {@link DefaultMessageSizeEstimator}.
@@ -166,7 +166,7 @@
/**
* Creates a new instance with the specified <tt>maxSessionBufferSize</tt>,
* <tt>maxGlobalBufferSize</tt> and {@link MessageSizeEstimator}.
- *
+ *
* @param maxSessionBufferSize the maximum amount of data in the buffer of
* the {@link ExecutorFilter} per {@link IoSession}.
* Specify {@code 0} or a smaller value to disable.
@@ -179,7 +179,7 @@
* a new {@link DefaultMessageSizeEstimator} is created.
*/
public ReadThrottleFilter(
- ScheduledExecutorService executor,
+ ScheduledExecutorService executor,
ReadThrottlePolicy policy, MessageSizeEstimator messageSizeEstimator,
int maxSessionBufferSize, int maxServiceBufferSize, int maxGlobalBufferSize) {
if (messageSizeEstimator == null) {
@@ -201,7 +201,7 @@
if (policy == null) {
throw new NullPointerException("policy");
}
-
+
this.policy = policy;
}
@@ -212,11 +212,11 @@
public int getMaxSessionBufferSize() {
return maxSessionBufferSize;
}
-
+
public int getMaxServiceBufferSize() {
return maxServiceBufferSize;
}
-
+
/**
* Returns the maximum amount of data in the buffer of the {@link ExecutorFilter}
* for all {@link IoSession} whose {@link IoFilterChain} has been configured by
@@ -225,7 +225,7 @@
public int getMaxGlobalBufferSize() {
return maxGlobalBufferSize;
}
-
+
/**
* Sets the maximum amount of data in the buffer of the {@link ExecutorFilter}
* per {@link IoSession}. Specify {@code 0} or a smaller value to disable.
@@ -255,14 +255,14 @@
}
this.maxGlobalBufferSize = maxGlobalBufferSize;
}
-
+
/**
* Returns the size estimator currently in use.
*/
public MessageSizeEstimator getMessageSizeEstimator() {
return messageSizeEstimator;
}
-
+
/**
* Returns the current amount of data in the buffer of the {@link ExecutorFilter}
* for the specified {@link IoSession}.
@@ -272,12 +272,12 @@
if (state == null) {
return 0;
}
-
+
synchronized (state) {
return state.sessionBufferSize;
}
}
-
+
@Override
public void onPreAdd(
IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
@@ -290,11 +290,11 @@
"You can't add the same filter instance more than once. Create another instance and add it.");
}
}
-
+
@Override
public void onPostAdd(
IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
-
+
// My previous filter must be an ExecutorFilter.
IoFilter lastFilter = null;
for (IoFilterChain.Entry e: parent.getAll()) {
@@ -309,13 +309,13 @@
"an " + ExecutorFilter.class.getName() + " in the chain");
}
}
-
+
lastFilter = currentFilter;
}
-
+
// Add an entering filter before the ExecutorFilter.
parent.getEntry(lastFilter).addBefore(name + ".preprocessor", enterFilter);
-
+
int previousSessionCount = sessionCount.getAndIncrement();
if (previousSessionCount == 0) {
synchronized (resumeOthersTask) {
@@ -334,7 +334,7 @@
} catch (Exception e) {
// Ignore.
}
-
+
int currentSessionCount = sessionCount.decrementAndGet();
if (currentSessionCount == 0) {
synchronized (resumeOthersTask) {
@@ -354,20 +354,20 @@
@Override
public void filterSetTrafficMask(NextFilter nextFilter, IoSession session,
TrafficMask trafficMask) throws Exception {
-
+
if (trafficMask.isReadable()) {
State state = getState(session);
boolean suspendedRead;
synchronized (state) {
suspendedRead = state.suspendedRead;
}
-
+
// Suppress resumeRead() if read is suspended by this filter.
if (suspendedRead) {
trafficMask = trafficMask.and(TrafficMask.WRITE);
}
}
-
+
nextFilter.filterSetTrafficMask(session, trafficMask);
}
@@ -382,13 +382,13 @@
// Ignore.
}
}
-
+
@Override
public void onPostRemove(
IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
parent.getSession().removeAttribute(STATE);
}
-
+
@Override
public void messageReceived(
NextFilter nextFilter, IoSession session, Object message) throws Exception {
@@ -396,7 +396,7 @@
nextFilter.messageReceived(session, message);
}
}
-
+
private int estimateSize(Object message) {
int size = messageSizeEstimator.estimateSize(message);
if (size < 0) {
@@ -416,16 +416,16 @@
int maxGlobalBufferSize = this.maxGlobalBufferSize;
int maxServiceBufferSize = this.maxServiceBufferSize;
int maxSessionBufferSize = this.maxSessionBufferSize;
-
+
ReadThrottlePolicy policy = getPolicy();
-
+
boolean enforcePolicy = false;
int sessionBufferSize;
synchronized (state) {
- sessionBufferSize = (state.sessionBufferSize += size);
- if ((maxSessionBufferSize != 0 && sessionBufferSize >= maxSessionBufferSize) ||
- (maxServiceBufferSize != 0 && serviceBufferSize >= maxServiceBufferSize) ||
- (maxGlobalBufferSize != 0 && globalBufferSize >= maxGlobalBufferSize)) {
+ sessionBufferSize = state.sessionBufferSize += size;
+ if (maxSessionBufferSize != 0 && sessionBufferSize >= maxSessionBufferSize ||
+ maxServiceBufferSize != 0 && serviceBufferSize >= maxServiceBufferSize ||
+ maxGlobalBufferSize != 0 && globalBufferSize >= maxGlobalBufferSize) {
enforcePolicy = true;
switch (policy) {
case EXCEPTION:
@@ -438,7 +438,7 @@
if (logger.isDebugEnabled()) {
logger.debug(getMessage(session, " Entered - "));
}
-
+
if (enforcePolicy) {
switch (policy) {
case CLOSE:
@@ -467,7 +467,7 @@
logger.debug(getMessage(session, "Suspended - "));
}
}
-
+
private void exit(IoSession session, int size) {
State state = getState(session);
@@ -475,7 +475,7 @@
if (globalBufferSize < 0) {
throw new IllegalStateException("globalBufferSize: " + globalBufferSize);
}
-
+
int serviceBufferSize = increaseServiceBufferSize(session.getService(), -size);
if (serviceBufferSize < 0) {
throw new IllegalStateException("serviceBufferSize: " + serviceBufferSize);
@@ -484,12 +484,12 @@
int maxGlobalBufferSize = this.maxGlobalBufferSize;
int maxServiceBufferSize = this.maxServiceBufferSize;
int maxSessionBufferSize = this.maxSessionBufferSize;
-
+
int sessionBufferSize;
-
+
boolean enforcePolicy = false;
synchronized (state) {
- sessionBufferSize = (state.sessionBufferSize -= size);
+ sessionBufferSize = state.sessionBufferSize -= size;
if (sessionBufferSize < 0) {
throw new IllegalStateException("sessionBufferSize: " + sessionBufferSize);
}
@@ -500,24 +500,24 @@
enforcePolicy = true;
}
}
-
+
if (logger.isDebugEnabled()) {
logger.debug(getMessage(session, " Exited - "));
}
-
+
if (enforcePolicy) {
session.resumeRead();
if (logger.isDebugEnabled()) {
logger.debug(getMessage(session, " Resumed - "));
}
}
-
+
resumeOthers();
}
-
+
private void resumeOthers() {
long currentTime = System.currentTimeMillis();
-
+
// Try to resume other sessions every other second.
boolean resumeOthers;
synchronized (globalResumeLock) {
@@ -528,21 +528,21 @@
resumeOthers = false;
}
}
-
+
if (resumeOthers) {
int maxGlobalBufferSize = this.maxGlobalBufferSize;
if (maxGlobalBufferSize == 0 || globalBufferSize.get() < maxGlobalBufferSize) {
List<IoService> inactiveServices = null;
for (IoService service: serviceBufferSizes.keySet()) {
resumeService(service);
-
+
if (!service.isActive()) {
if (inactiveServices == null) {
inactiveServices = new ArrayList<IoService>();
}
inactiveServices.add(service);
}
-
+
// Remove inactive services from the map.
if (inactiveServices != null) {
for (IoService s: inactiveServices) {
@@ -557,26 +557,26 @@
}
}
}
-
+
private void resumeService(IoService service) {
int maxServiceBufferSize = this.maxServiceBufferSize;
if (maxServiceBufferSize == 0 || getServiceBufferSize(service) < maxServiceBufferSize) {
- for (IoSession session: service.getManagedSessions()) {
+ for (IoSession session: service.getManagedSessions().values()) {
resume(session);
}
}
}
-
+
private void resume(IoSession session) {
State state = (State) session.getAttribute(STATE);
if (state == null) {
return;
}
-
+
int maxSessionBufferSize = this.maxSessionBufferSize;
boolean resume = false;
synchronized (state) {
- if ((maxSessionBufferSize == 0 || state.sessionBufferSize < maxSessionBufferSize)) {
+ if (maxSessionBufferSize == 0 || state.sessionBufferSize < maxSessionBufferSize) {
state.suspendedRead = false;
resume = true;
}
@@ -592,7 +592,7 @@
private void log(IoSession session, State state) {
long currentTime = System.currentTimeMillis();
-
+
// Prevent log flood by logging every 3 seconds.
boolean log;
synchronized (state.logLock) {
@@ -603,20 +603,20 @@
log = false;
}
}
-
+
if (log) {
logger.warn(getMessage(session));
}
}
-
+
private void raiseException(IoSession session) {
throw new ReadFloodException(getMessage(session));
}
-
+
private String getMessage(IoSession session) {
return getMessage(session, "Read buffer flooded - ");
}
-
+
private String getMessage(IoSession session, String prefix) {
int sessionLimit = maxSessionBufferSize;
int serviceLimit = maxServiceBufferSize;
@@ -634,7 +634,7 @@
buf.append(getSessionBufferSize(session));
buf.append(" / unlimited bytes, ");
}
-
+
buf.append("service: ");
if (serviceLimit != 0) {
buf.append(getServiceBufferSize(session.getService()));
@@ -645,7 +645,7 @@
buf.append(getServiceBufferSize(session.getService()));
buf.append(" / unlimited bytes, ");
}
-
+
buf.append("global: ");
if (globalLimit != 0) {
buf.append(getGlobalBufferSize());
@@ -656,10 +656,10 @@
buf.append(getGlobalBufferSize());
buf.append(" / unlimited bytes.");
}
-
+
return buf.toString();
}
-
+
private State getState(IoSession session) {
State state = (State) session.getAttribute(STATE);
if (state == null) {
@@ -671,7 +671,7 @@
}
return state;
}
-
+
@Override
public String toString() {
return String.valueOf(getGlobalBufferSize()) + '/' + getMaxGlobalBufferSize();
Modified: mina/sandbox/native/pom.xml
URL: http://svn.apache.org/viewvc/mina/sandbox/native/pom.xml?rev=655717&r1=655716&r2=655717&view=diff
==============================================================================
--- mina/sandbox/native/pom.xml (original)
+++ mina/sandbox/native/pom.xml Mon May 12 19:56:27 2008
@@ -17,6 +17,10 @@
<groupId>${groupId}</groupId>
<artifactId>mina-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>tomcat</groupId>
+ <artifactId>tomcat-apr</artifactId>
+ </dependency>
</dependencies>
</project>
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java?rev=655717&r1=655716&r2=655717&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java Mon May 12 19:56:27 2008
@@ -22,6 +22,7 @@
import java.util.AbstractSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@@ -261,7 +262,7 @@
*/
protected abstract IoFuture dispose0() throws Exception;
- public final Set<IoSession> getManagedSessions() {
+ public final Map<Long, IoSession> getManagedSessions() {
return listeners.getManagedSessions();
}
@@ -726,7 +727,7 @@
// direct caller of MessageBroadcaster knows the order of write
// operations.
final List<WriteFuture> futures = IoUtil.broadcast(
- message, getManagedSessions());
+ message, getManagedSessions().values());
return new AbstractSet<WriteFuture>() {
@Override
public Iterator<WriteFuture> iterator() {
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java?rev=655717&r1=655716&r2=655717&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java Mon May 12 19:56:27 2008
@@ -44,9 +44,9 @@
private final Object lock = new Object();
private final IoProcessor<T> processor = new ConnectionlessAcceptorProcessor();
- private final Queue<AcceptorOperationFuture> registerQueue =
+ private final Queue<AcceptorOperationFuture> registerQueue =
new ConcurrentLinkedQueue<AcceptorOperationFuture>();
- private final Queue<AcceptorOperationFuture> cancelQueue =
+ private final Queue<AcceptorOperationFuture> cancelQueue =
new ConcurrentLinkedQueue<AcceptorOperationFuture>();
private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
private final Map<SocketAddress, H> boundHandles =
@@ -270,7 +270,7 @@
cancelQueue.clear();
flushingSessions.clear();
}
-
+
synchronized (lock) {
if (worker == null) {
worker = new Worker();
@@ -326,7 +326,7 @@
}
}
}
-
+
if (selectable && isDisposing()) {
selectable = false;
try {
@@ -351,7 +351,7 @@
}
if (isWritable(h)) {
- for (IoSession session : getManagedSessions()) {
+ for (IoSession session : getManagedSessions().values()) {
scheduleFlush((T) session);
}
}
@@ -404,7 +404,7 @@
private boolean flush(T session, long currentTime) throws Exception {
// Clear OP_WRITE
setInterestedInWrite(session, false);
-
+
final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
final int maxWrittenBytes =
session.getConfig().getMaxReadBufferSize() +
@@ -421,7 +421,7 @@
}
session.setCurrentWriteRequest(req);
}
-
+
IoBuffer buf = (IoBuffer) req.getMessage();
if (buf.remaining() == 0) {
// Clear and fire event
@@ -430,12 +430,12 @@
session.getFilterChain().fireMessageSent(req);
continue;
}
-
+
SocketAddress destination = req.getDestination();
if (destination == null) {
destination = session.getRemoteAddress();
}
-
+
int localWrittenBytes = send(session, buf, destination);
if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
// Kernel buffer is full or wrote too much
@@ -443,7 +443,7 @@
return false;
} else {
setInterestedInWrite(session, false);
-
+
// Clear and fire event
session.setCurrentWriteRequest(null);
writtenBytes += localWrittenBytes;
@@ -473,7 +473,7 @@
newHandles.put(localAddress(handle), handle);
}
boundHandles.putAll(newHandles);
-
+
getListeners().fireServiceActivated();
req.setDone();
return newHandles.size();
@@ -493,7 +493,7 @@
}
}
}
-
+
return 0;
}
@@ -521,10 +521,10 @@
nHandles ++;
}
}
-
+
request.setDone();
}
-
+
return nHandles;
}
@@ -533,7 +533,7 @@
if (currentTime - lastIdleCheckTime >= 1000) {
lastIdleCheckTime = currentTime;
IdleStatusChecker.notifyIdleness(
- getListeners().getManagedSessions().iterator(),
+ getListeners().getManagedSessions().values().iterator(),
currentTime);
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java?rev=655717&r1=655716&r2=655717&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java Mon May 12 19:56:27 2008
@@ -46,7 +46,7 @@
sessions.add(session);
session.getCloseFuture().addListener(sessionCloseListener);
}
-
+
public void addService(AbstractIoService service) {
services.add(service);
}
@@ -54,20 +54,20 @@
public void removeSession(AbstractIoSession session) {
sessions.remove(session);
}
-
+
public void removeService(AbstractIoService service) {
services.remove(service);
}
-
+
public NotifyingTask getNotifyingTask() {
return notifyingTask;
}
-
+
public interface NotifyingTask extends Runnable {
/**
* Cancels this task. Once canceled, {@link #run()} method will always return immediately.
* To start this task again after calling this method, you have to create a new instance of
- * {@link IdleStatusChecker} again.
+ * {@link IdleStatusChecker} again.
*/
void cancel();
}
@@ -75,7 +75,7 @@
private class NotifyingTaskImpl implements NotifyingTask {
private volatile boolean cancelled;
private volatile Thread thread;
-
+
public void run() {
thread = Thread.currentThread();
try {
@@ -84,7 +84,7 @@
long currentTime = System.currentTimeMillis();
notifyServices(currentTime);
notifySessions(currentTime);
-
+
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
@@ -95,7 +95,7 @@
thread = null;
}
}
-
+
public void cancel() {
cancelled = true;
Thread thread = this.thread;
@@ -124,7 +124,7 @@
}
}
}
-
+
private class SessionCloseListener implements IoFutureListener<IoFuture> {
public void operationComplete(IoFuture future) {
removeSession((AbstractIoSession) future.getSession());
@@ -148,16 +148,16 @@
public static void notifyIdleness(IoService service, long currentTime) {
notifyIdleness(service, currentTime, true);
}
-
+
private static void notifyIdleness(IoService service, long currentTime, boolean includeSessions) {
if (!(service instanceof AbstractIoService)) {
return;
}
-
+
((AbstractIoService) service).notifyIdleness(currentTime);
-
+
if (includeSessions) {
- notifyIdleness(service.getManagedSessions().iterator(), currentTime);
+ notifyIdleness(service.getManagedSessions().values().iterator(), currentTime);
}
}
@@ -176,21 +176,21 @@
IdleStatus.BOTH_IDLE, Math.max(
s.getLastIoTime(),
s.getLastIdleTime(IdleStatus.BOTH_IDLE)));
-
+
notifyIdleSession1(
s, currentTime,
s.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
IdleStatus.READER_IDLE, Math.max(
s.getLastReadTime(),
s.getLastIdleTime(IdleStatus.READER_IDLE)));
-
+
notifyIdleSession1(
s, currentTime,
s.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
IdleStatus.WRITER_IDLE, Math.max(
s.getLastWriteTime(),
s.getLastIdleTime(IdleStatus.WRITER_IDLE)));
-
+
notifyWriteTimeout(s, currentTime);
updateThroughput(s, currentTime);
} else {
@@ -200,14 +200,14 @@
IdleStatus.BOTH_IDLE, Math.max(
session.getLastIoTime(),
session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
-
+
notifyIdleSession0(
session, currentTime,
session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
IdleStatus.READER_IDLE, Math.max(
session.getLastReadTime(),
session.getLastIdleTime(IdleStatus.READER_IDLE)));
-
+
notifyIdleSession0(
session, currentTime,
session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoService.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoService.java?rev=655717&r1=655716&r2=655717&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoService.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoService.java Mon May 12 19:56:27 2008
@@ -20,6 +20,7 @@
package org.apache.mina.common;
import java.util.Collection;
+import java.util.Map;
import java.util.Set;
/**
@@ -78,11 +79,13 @@
void setHandler(IoHandler handler);
/**
- * Returns all sessions which are currently managed by this service.
+ * Returns the map of all sessions which are currently managed by this
+ * service. The key of map is the {@link IoSession#getId() ID} of the
+ * session.
*
* @return the sessions. An empty collection if there's no session.
*/
- Set<IoSession> getManagedSessions();
+ Map<Long, IoSession> getManagedSessions();
/**
* Returns the number of all sessions which are currently managed by this
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java?rev=655717&r1=655716&r2=655717&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java Mon May 12 19:56:27 2008
@@ -21,12 +21,12 @@
import java.util.Collections;
import java.util.List;
-import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.mina.util.ConcurrentHashSet;
-
/**
* A helper which provides addition and removal of {@link IoServiceListener}s and firing
* events.
@@ -48,13 +48,13 @@
/**
* Tracks managed sessions.
*/
- private final Set<IoSession> managedSessions = new ConcurrentHashSet<IoSession>();
+ private final ConcurrentMap<Long, IoSession> managedSessions = new ConcurrentHashMap<Long, IoSession>();
/**
* Read only version of {@link #managedSessions}.
*/
- private final Set<IoSession> readOnlyManagedSessions = Collections.unmodifiableSet(managedSessions);
-
+ private final Map<Long, IoSession> readOnlyManagedSessions = Collections.unmodifiableMap(managedSessions);
+
private final AtomicBoolean activated = new AtomicBoolean();
private volatile long activationTime;
private volatile int largestManagedSessionCount;
@@ -83,23 +83,23 @@
public void remove(IoServiceListener listener) {
listeners.remove(listener);
}
-
+
public long getActivationTime() {
return activationTime;
}
- public Set<IoSession> getManagedSessions() {
+ public Map<Long, IoSession> getManagedSessions() {
return readOnlyManagedSessions;
}
-
+
public int getManagedSessionCount() {
return managedSessions.size();
}
-
+
public int getLargestManagedSessionCount() {
return largestManagedSessionCount;
}
-
+
public long getCumulativeManagedSessionCount() {
return cumulativeManagedSessionCount;
}
@@ -127,7 +127,7 @@
}
}
}
-
+
/**
* Calls {@link IoServiceListener#serviceIdle(IoService, IdleStatus)}
* for all registered listeners.
@@ -181,10 +181,10 @@
}
// If already registered, ignore.
- if (!managedSessions.add(session)) {
+ if (managedSessions.putIfAbsent(Long.valueOf(session.getId()), session) != null) {
return;
}
-
+
// If the first connector session, fire a virtual service activation event.
if (firstSession) {
fireServiceActivated();
@@ -215,7 +215,7 @@
*/
public void fireSessionDestroyed(IoSession session) {
// Try to remove the remaining empty session set after removal.
- if (!managedSessions.remove(session)) {
+ if (managedSessions.remove(Long.valueOf(session.getId())) == null) {
return;
}
@@ -257,7 +257,7 @@
Object lock = new Object();
IoFutureListener<IoFuture> listener = new LockNotifyingListener(lock);
- for (IoSession s : managedSessions) {
+ for (IoSession s : managedSessions.values()) {
s.close().addListener(listener);
}
Modified: mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java?rev=655717&r1=655716&r2=655717&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java Mon May 12 19:56:27 2008
@@ -106,7 +106,7 @@
handlerControl.verify();
Assert.assertEquals(1, support.getManagedSessions().size());
- Assert.assertTrue(support.getManagedSessions().contains(session));
+ Assert.assertSame(session, support.getManagedSessions().get(session.getId()));
// Test destruction & other side effects
listenerControl.reset();
@@ -126,7 +126,7 @@
Assert.assertTrue(session.isClosing());
Assert.assertEquals(0, support.getManagedSessions().size());
- Assert.assertFalse(support.getManagedSessions().contains(session));
+ Assert.assertNull(support.getManagedSessions().get(session.getId()));
}
public void testDisconnectOnUnbind() throws Exception {
@@ -207,7 +207,7 @@
Assert.assertTrue(session.isClosing());
Assert.assertEquals(0, support.getManagedSessions().size());
- Assert.assertFalse(support.getManagedSessions().contains(session));
+ Assert.assertNull(support.getManagedSessions().get(session.getId()));
}
public void testConnectorActivation() throws Exception {
@@ -263,6 +263,6 @@
handlerControl.verify();
Assert.assertEquals(0, support.getManagedSessions().size());
- Assert.assertFalse(support.getManagedSessions().contains(session));
+ Assert.assertNull(support.getManagedSessions().get(session.getId()));
}
}
Modified: mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractBindTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractBindTest.java?rev=655717&r1=655716&r2=655717&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractBindTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractBindTest.java Mon May 12 19:56:27 2008
@@ -172,7 +172,7 @@
// Wait for the server side sessions to be created.
Thread.sleep(500);
- Collection<IoSession> managedSessions = acceptor.getManagedSessions();
+ Collection<IoSession> managedSessions = acceptor.getManagedSessions().values();
Assert.assertEquals(5, managedSessions.size());
acceptor.unbind();
Modified: mina/trunk/integration-jmx/src/main/java/org/apache/mina/integration/jmx/IoServiceMBean.java
URL: http://svn.apache.org/viewvc/mina/trunk/integration-jmx/src/main/java/org/apache/mina/integration/jmx/IoServiceMBean.java?rev=655717&r1=655716&r2=655717&view=diff
==============================================================================
--- mina/trunk/integration-jmx/src/main/java/org/apache/mina/integration/jmx/IoServiceMBean.java (original)
+++ mina/trunk/integration-jmx/src/main/java/org/apache/mina/integration/jmx/IoServiceMBean.java Mon May 12 19:56:27 2008
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,6 +21,7 @@
import java.util.List;
import java.util.Set;
+import javax.management.MBeanOperationInfo;
import javax.management.MBeanParameterInfo;
import javax.management.ObjectName;
import javax.management.modelmbean.ModelMBeanOperationInfo;
@@ -33,7 +34,7 @@
/**
* A JMX MBean wrapper for an {@link IoSession}.
- *
+ *
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev$, $Date$
*/
@@ -48,7 +49,7 @@
id = "0x" + id;
return id;
}
-
+
public IoServiceMBean(IoService source) {
super(source);
}
@@ -57,35 +58,35 @@
protected Object invoke0(String name, Object[] params, String[] signature) throws Exception {
if (name.equals("findSessions")) {
IoSessionFinder finder = new IoSessionFinder((String) params[0]);
- return finder.find(getSource().getManagedSessions());
+ return finder.find(getSource().getManagedSessions().values());
}
-
+
if (name.equals("findAndRegisterSessions")) {
IoSessionFinder finder = new IoSessionFinder((String) params[0]);
Set<IoSession> registeredSessions = new LinkedHashSet<IoSession>();
- for (IoSession s: finder.find(getSource().getManagedSessions())) {
+ for (IoSession s: finder.find(getSource().getManagedSessions().values())) {
try {
getServer().registerMBean(
new IoSessionMBean(s),
new ObjectName(
- getName().getDomain() +
- ":type=session,name=" +
+ getName().getDomain() +
+ ":type=session,name=" +
getSessionIdAsString(s.getId())));
registeredSessions.add(s);
} catch (Exception e) {
logger.warn("Failed to register a session as a MBean: " + s, e);
}
}
-
+
return registeredSessions;
}
-
+
if (name.equals("findAndProcessSessions")) {
IoSessionFinder finder = new IoSessionFinder((String) params[0]);
String command = (String) params[1];
Object expr = Ognl.parseExpression(command);
- Set<IoSession> matches = finder.find(getSource().getManagedSessions());
-
+ Set<IoSession> matches = finder.find(getSource().getManagedSessions().values());
+
for (IoSession s: matches) {
try {
Ognl.getValue(expr, s);
@@ -95,7 +96,7 @@
}
return matches;
}
-
+
return super.invoke0(name, params, signature);
}
@@ -106,13 +107,13 @@
new MBeanParameterInfo[] {
new MBeanParameterInfo(
"ognlQuery", String.class.getName(), "a boolean OGNL expression")
- }, Set.class.getName(), ModelMBeanOperationInfo.INFO));
+ }, Set.class.getName(), MBeanOperationInfo.INFO));
operations.add(new ModelMBeanOperationInfo(
"findAndRegisterSessions", "findAndRegisterSessions",
new MBeanParameterInfo[] {
new MBeanParameterInfo(
"ognlQuery", String.class.getName(), "a boolean OGNL expression")
- }, Set.class.getName(), ModelMBeanOperationInfo.ACTION_INFO));
+ }, Set.class.getName(), MBeanOperationInfo.ACTION_INFO));
operations.add(new ModelMBeanOperationInfo(
"findAndProcessSessions", "findAndProcessSessions",
new MBeanParameterInfo[] {
@@ -120,7 +121,7 @@
"ognlQuery", String.class.getName(), "a boolean OGNL expression"),
new MBeanParameterInfo(
"ognlCommand", String.class.getName(), "an OGNL expression that modifies the state of the sessions in the match result")
- }, Set.class.getName(), ModelMBeanOperationInfo.ACTION_INFO));
+ }, Set.class.getName(), MBeanOperationInfo.ACTION_INFO));
}
@Override
@@ -130,13 +131,13 @@
"(newSession|broadcast|(add|remove)Listener)")) {
return false;
}
-
+
if ((methodName.equals("bind") || methodName.equals("unbind")) &&
(paramTypes.length > 1 ||
- (paramTypes.length == 1 && !SocketAddress.class.isAssignableFrom(paramTypes[0])))) {
+ paramTypes.length == 1 && !SocketAddress.class.isAssignableFrom(paramTypes[0]))) {
return false;
}
-
+
return super.isOperation(methodName, paramTypes);
}
}