You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/11/21 03:31:57 UTC
svn commit: r596909 - in /mina/trunk/core/src:
main/java/org/apache/mina/common/
main/java/org/apache/mina/transport/socket/nio/
test/java/org/apache/mina/transport/socket/nio/
Author: trustin
Date: Tue Nov 20 18:31:56 2007
New Revision: 596909
URL: http://svn.apache.org/viewvc?rev=596909&view=rev
Log:
* Added an abstraction layer for connectionless polling acceptors - AbstractPollingConnectionlessIoAcceptor
* DatagramAcceptor now extends AbstractPollingConnectionlessIoAcceptor
Added:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java (with props)
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoConnector.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java
mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
Added: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java?rev=596909&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java Tue Nov 20 18:31:56 2007
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.ClosedSelectorException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mina.transport.socket.DatagramSessionConfig;
+import org.apache.mina.util.NamePreservingRunnable;
+import org.apache.mina.util.NewThreadExecutor;
+
+/**
+ * {@link IoAcceptor} for datagram transport (UDP/IP).
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public abstract class AbstractPollingConnectionlessIoAcceptor<T extends AbstractIoSession, H>
+ extends AbstractIoAcceptor {
+
+ private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
+
+ private static final AtomicInteger id = new AtomicInteger();
+
+ private final Executor executor;
+ private final String threadName;
+ private final IoProcessor<T> processor = new ConnectionlessAcceptorProcessor();
+ private final Queue<ServiceOperationFuture> registerQueue = new ConcurrentLinkedQueue<ServiceOperationFuture>();
+ private final Queue<ServiceOperationFuture> cancelQueue = new ConcurrentLinkedQueue<ServiceOperationFuture>();
+ private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
+ private final Map<SocketAddress, H> boundHandles =
+ Collections.synchronizedMap(new HashMap<SocketAddress, H>());
+
+ private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
+
+ private Worker worker;
+ private long lastIdleCheckTime;
+
+ /**
+ * Creates a new instance.
+ */
+ protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig) {
+ this(sessionConfig, new NewThreadExecutor());
+ }
+
+ /**
+ * Creates a new instance.
+ */
+ protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig, Executor executor) {
+ super(sessionConfig);
+
+ threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
+ this.executor = executor;
+
+ doInit();
+ }
+
+ protected abstract void doInit();
+ protected abstract void doDispose0();
+ protected abstract boolean selectable();
+ protected abstract boolean select(int timeout) throws Exception;
+ protected abstract void wakeup();
+ protected abstract Iterator<H> selectedHandles();
+ protected abstract H bind(SocketAddress localAddress) throws Exception;
+ protected abstract void unbind(H handle) throws Exception;
+ protected abstract SocketAddress localAddress(H handle) throws Exception;
+ protected abstract boolean isReadable(H handle);
+ protected abstract boolean isWritable(H handle);
+ protected abstract SocketAddress receive(H handle, IoBuffer buffer) throws Exception;
+ protected abstract int send(T session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception;
+ protected abstract T newSession(H handle, SocketAddress remoteAddress);
+ protected abstract void setInterestedInWrite(T session, boolean interested) throws Exception;
+
+
+ @Override
+ public DatagramSessionConfig getSessionConfig() {
+ return (DatagramSessionConfig) super.getSessionConfig();
+ }
+
+ @Override
+ public InetSocketAddress getLocalAddress() {
+ return (InetSocketAddress) super.getLocalAddress();
+ }
+
+ public void setLocalAddress(InetSocketAddress localAddress) {
+ setLocalAddress((SocketAddress) localAddress);
+ }
+
+ @Override
+ protected void doBind() throws Exception {
+ ServiceOperationFuture request = new ServiceOperationFuture();
+
+ registerQueue.add(request);
+ startupWorker();
+ wakeup();
+
+ request.awaitUninterruptibly();
+
+ if (request.getException() != null) {
+ throw request.getException();
+ }
+
+ Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
+ for (H handle: boundHandles.values()) {
+ newLocalAddresses.add(localAddress(handle));
+ }
+ setLocalAddresses(newLocalAddresses);
+ }
+
+ @Override
+ protected void doUnbind() throws Exception {
+ ServiceOperationFuture request = new ServiceOperationFuture();
+
+ cancelQueue.add(request);
+ startupWorker();
+ wakeup();
+
+ request.awaitUninterruptibly();
+
+ if (request.getException() != null) {
+ throw request.getException();
+ }
+ }
+
+ public IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
+ if (isDisposed()) {
+ throw new IllegalStateException("Already disposed.");
+ }
+
+ if (remoteAddress == null) {
+ throw new NullPointerException("remoteAddress");
+ }
+
+ synchronized (bindLock) {
+ if (!isActive()) {
+ throw new IllegalStateException(
+ "Can't create a session from a unbound service.");
+ }
+
+ return newSessionWithoutLock(remoteAddress, localAddress);
+ }
+ }
+
+ private IoSession newSessionWithoutLock(
+ SocketAddress remoteAddress, SocketAddress localAddress) {
+ H handle = boundHandles.get(localAddress);
+ if (handle == null) {
+ throw new IllegalArgumentException("Unknown local address: " + localAddress);
+ }
+
+ IoSession session;
+ IoSessionRecycler sessionRecycler = getSessionRecycler();
+ synchronized (sessionRecycler) {
+ session = sessionRecycler.recycle(localAddress, remoteAddress);
+ if (session != null) {
+ return session;
+ }
+
+ // If a new session needs to be created.
+ T newSession = newSession(handle, remoteAddress);
+ getSessionRecycler().put(newSession);
+ session = newSession;
+ }
+
+ finishSessionInitialization(session, null);
+
+ try {
+ this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ getListeners().fireSessionCreated(session);
+ } catch (Throwable t) {
+ ExceptionMonitor.getInstance().exceptionCaught(t);
+ }
+
+ return session;
+ }
+
+ public IoSessionRecycler getSessionRecycler() {
+ return sessionRecycler;
+ }
+
+ public void setSessionRecycler(IoSessionRecycler sessionRecycler) {
+ synchronized (bindLock) {
+ if (isActive()) {
+ throw new IllegalStateException(
+ "sessionRecycler can't be set while the acceptor is bound.");
+ }
+
+ if (sessionRecycler == null) {
+ sessionRecycler = DEFAULT_RECYCLER;
+ }
+ this.sessionRecycler = sessionRecycler;
+ }
+ }
+
+ @Override
+ protected IoServiceListenerSupport getListeners() {
+ return super.getListeners();
+ }
+
+ protected IoProcessor<T> getProcessor() {
+ return processor;
+ }
+
+ private class ConnectionlessAcceptorProcessor implements IoProcessor<T> {
+
+ public void add(T session) {
+ }
+
+ public void flush(T session) {
+ if (scheduleFlush(session)) {
+ wakeup();
+ }
+ }
+
+ public void remove(T session) {
+ getSessionRecycler().remove(session);
+ getListeners().fireSessionDestroyed(session);
+ }
+
+ public void updateTrafficMask(T session) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void dispose() {
+ }
+ }
+
+ private void startupWorker() {
+ if (!selectable()) {
+ registerQueue.clear();
+ cancelQueue.clear();
+ flushingSessions.clear();
+ throw new ClosedSelectorException();
+ }
+ synchronized (this) {
+ if (worker == null) {
+ worker = new Worker();
+ executor.execute(
+ new NamePreservingRunnable(worker, threadName));
+ }
+ }
+ }
+
+ private boolean scheduleFlush(T session) {
+ if (session.setScheduledForFlush(true)) {
+ flushingSessions.add(session);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private class Worker implements Runnable {
+ public void run() {
+ int nHandles = 0;
+ lastIdleCheckTime = System.currentTimeMillis();
+
+ for (; ;) {
+ try {
+ boolean selected = select(1000);
+
+ nHandles += registerHandles();
+
+ if (selected) {
+ processReadySessions(selectedHandles());
+ }
+
+ flushSessions();
+ nHandles -= unregisterHandles();
+
+ notifyIdleSessions();
+
+ if (nHandles == 0) {
+ synchronized (AbstractPollingConnectionlessIoAcceptor.this) {
+ if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
+ worker = null;
+ break;
+ }
+ }
+ }
+ } catch (Exception e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ }
+ }
+ }
+
+ if (isDisposed()) {
+ doDispose0();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void processReadySessions(Iterator<H> handles) {
+ while (handles.hasNext()) {
+ H h = handles.next();
+ handles.remove();
+ try {
+ if (isReadable(h)) {
+ readHandle(h);
+ }
+
+ if (isWritable(h)) {
+ for (IoSession session : getManagedSessions()) {
+ scheduleFlush((T) session);
+ }
+ }
+ } catch (Throwable t) {
+ ExceptionMonitor.getInstance().exceptionCaught(t);
+ }
+ }
+ }
+
+ private void readHandle(H handle) throws Exception {
+ IoBuffer readBuf = IoBuffer.allocate(getSessionConfig()
+ .getReceiveBufferSize());
+
+ SocketAddress remoteAddress = receive(handle, readBuf);
+ if (remoteAddress != null) {
+ IoSession session = newSessionWithoutLock(
+ remoteAddress, localAddress(handle));
+
+ readBuf.flip();
+
+ IoBuffer newBuf = IoBuffer.allocate(readBuf.limit());
+ newBuf.put(readBuf);
+ newBuf.flip();
+
+ session.getFilterChain().fireMessageReceived(newBuf);
+ }
+ }
+
+ private void flushSessions() {
+ for (; ;) {
+ T session = flushingSessions.poll();
+ if (session == null) {
+ break;
+ }
+
+ session.setScheduledForFlush(false);
+
+ try {
+ boolean flushedAll = flush(session);
+ if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
+ !session.isScheduledForFlush()) {
+ scheduleFlush(session);
+ }
+ } catch (Exception e) {
+ session.getFilterChain().fireExceptionCaught(e);
+ }
+ }
+ }
+
+ private boolean flush(T session) throws Exception {
+ // Clear OP_WRITE
+ setInterestedInWrite(session, false);
+
+ WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
+
+ int maxWrittenBytes =
+ session.getConfig().getMaxReadBufferSize() +
+ (session.getConfig().getMaxReadBufferSize() >>> 1);
+
+ int writtenBytes = 0;
+ for (; ;) {
+ WriteRequest req = session.getCurrentWriteRequest();
+ if (req == null) {
+ req = writeRequestQueue.poll(session);
+ if (req == null) {
+ break;
+ }
+ session.setCurrentWriteRequest(req);
+ }
+
+ IoBuffer buf = (IoBuffer) req.getMessage();
+ if (buf.remaining() == 0) {
+ // Clear and fire event
+ session.setCurrentWriteRequest(null);
+ buf.reset();
+ session.getFilterChain().fireMessageSent(req);
+ continue;
+ }
+
+ SocketAddress destination = req.getDestination();
+ if (destination == null) {
+ destination = session.getRemoteAddress();
+ }
+
+ int localWrittenBytes = send(session, buf, destination);
+ if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
+ // Kernel buffer is full or wrote too much
+ setInterestedInWrite(session, true);
+ return false;
+ } else {
+ setInterestedInWrite(session, false);
+
+ // Clear and fire event
+ session.setCurrentWriteRequest(null);
+ writtenBytes += localWrittenBytes;
+ buf.reset();
+ session.getFilterChain().fireMessageSent(req);
+ }
+ }
+
+ return true;
+ }
+
+ private int registerHandles() {
+ if (registerQueue.isEmpty()) {
+ return 0;
+ }
+
+ for (; ;) {
+ ServiceOperationFuture req = registerQueue.poll();
+ if (req == null) {
+ break;
+ }
+
+ Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
+ List<SocketAddress> localAddresses = getLocalAddresses();
+ try {
+ for (SocketAddress a: localAddresses) {
+ H handle = bind(a);
+ newHandles.put(localAddress(handle), handle);
+ }
+
+ boundHandles.putAll(newHandles);
+
+ getListeners().fireServiceActivated();
+ req.setDone();
+ return boundHandles.size();
+ } catch (Exception e) {
+ req.setException(e);
+ } finally {
+ // Roll back if failed to bind all addresses.
+ if (req.getException() != null) {
+ for (H handle: newHandles.values()) {
+ try {
+ unbind(handle);
+ } catch (Exception e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ wakeup();
+ }
+ }
+ }
+
+ return 0;
+ }
+
+ private int unregisterHandles() {
+ int nHandles = 0;
+ for (; ;) {
+ ServiceOperationFuture request = cancelQueue.poll();
+ if (request == null) {
+ break;
+ }
+
+ // close the channels
+ for (H handle: boundHandles.values()) {
+ try {
+ unbind(handle);
+ wakeup(); // wake up again to trigger thread death
+ } catch (Exception e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ nHandles ++;
+ }
+
+ boundHandles.clear();
+ request.setDone();
+ }
+
+ return nHandles;
+ }
+
+ private void notifyIdleSessions() {
+ // process idle sessions
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - lastIdleCheckTime >= 1000) {
+ lastIdleCheckTime = currentTime;
+ IdleStatusChecker.notifyIdleness(
+ getListeners().getManagedSessions().iterator(),
+ currentTime);
+ }
+ }
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java?rev=596909&r1=596908&r2=596909&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java Tue Nov 20 18:31:56 2007
@@ -288,6 +288,7 @@
// and notify.
future.setDone();
+ return boundHandles.size();
} catch (Exception e) {
future.setException(e);
} finally {
@@ -305,7 +306,7 @@
}
}
- return boundHandles.size();
+ return 0;
}
/**
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoConnector.java?rev=596909&r1=596908&r2=596909&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoConnector.java Tue Nov 20 18:31:56 2007
@@ -89,7 +89,7 @@
protected abstract H newHandle(SocketAddress localAddress) throws Exception;
protected abstract boolean connect(H handle, SocketAddress remoteAddress) throws Exception;
protected abstract void finishConnect(H handle) throws Exception;
- protected abstract T newSession(IoProcessor<T> processor, H handle) throws Exception;
+ protected abstract T newSession(IoProcessor<T> processor, H handle);
protected abstract void destroy(H handle) throws Exception;
protected abstract void wakeup();
protected abstract boolean selectable();
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java?rev=596909&r1=596908&r2=596909&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java Tue Nov 20 18:31:56 2007
@@ -22,41 +22,22 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.nio.channels.ClosedSelectorException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Collection;
import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.mina.common.AbstractIoAcceptor;
+import org.apache.mina.common.AbstractPollingConnectionlessIoAcceptor;
import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.ExpiringSessionRecycler;
-import org.apache.mina.common.IdleStatusChecker;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.IoBuffer;
-import org.apache.mina.common.IoProcessor;
-import org.apache.mina.common.IoServiceListenerSupport;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoSessionRecycler;
import org.apache.mina.common.RuntimeIoException;
import org.apache.mina.common.TransportMetadata;
-import org.apache.mina.common.WriteRequest;
-import org.apache.mina.common.WriteRequestQueue;
import org.apache.mina.transport.socket.DatagramAcceptor;
import org.apache.mina.transport.socket.DatagramSessionConfig;
import org.apache.mina.transport.socket.DefaultDatagramSessionConfig;
-import org.apache.mina.util.NamePreservingRunnable;
-import org.apache.mina.util.NewThreadExecutor;
/**
* {@link IoAcceptor} for datagram transport (UDP/IP).
@@ -64,51 +45,37 @@
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev$, $Date$
*/
-public class NioDatagramAcceptor extends AbstractIoAcceptor implements DatagramAcceptor {
- private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
+public class NioDatagramAcceptor
+ extends AbstractPollingConnectionlessIoAcceptor<NioSession, DatagramChannel>
+ implements DatagramAcceptor {
- private static final AtomicInteger id = new AtomicInteger();
-
- private final Executor executor;
- private final String threadName;
- private final Selector selector;
- private final IoProcessor<NioSession> processor = new DatagramAcceptorProcessor();
- private final Queue<ServiceOperationFuture> registerQueue = new ConcurrentLinkedQueue<ServiceOperationFuture>();
- private final Queue<ServiceOperationFuture> cancelQueue = new ConcurrentLinkedQueue<ServiceOperationFuture>();
- private final Queue<NioDatagramSession> flushingSessions = new ConcurrentLinkedQueue<NioDatagramSession>();
- private final Map<SocketAddress, DatagramChannel> serverChannels =
- Collections.synchronizedMap(new HashMap<SocketAddress, DatagramChannel>());
-
- private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
-
- private Worker worker;
- private long lastIdleCheckTime;
+ private volatile Selector selector;
/**
* Creates a new instance.
*/
public NioDatagramAcceptor() {
- this(new NewThreadExecutor());
+ super(new DefaultDatagramSessionConfig());
}
/**
* Creates a new instance.
*/
public NioDatagramAcceptor(Executor executor) {
- super(new DefaultDatagramSessionConfig());
-
- threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
-
+ super(new DefaultDatagramSessionConfig(), executor);
+ }
+
+ @Override
+ protected void doInit() {
try {
this.selector = Selector.open();
} catch (IOException e) {
throw new RuntimeIoException("Failed to open a selector.", e);
}
-
- this.executor = executor;
}
- private void disposeNow() {
+ @Override
+ protected void doDispose0() {
if (selector != null) {
try {
selector.close();
@@ -124,467 +91,168 @@
@Override
public DatagramSessionConfig getSessionConfig() {
- return (DatagramSessionConfig) super.getSessionConfig();
+ return super.getSessionConfig();
}
@Override
public InetSocketAddress getLocalAddress() {
- return (InetSocketAddress) super.getLocalAddress();
+ return super.getLocalAddress();
}
+ @Override
public void setLocalAddress(InetSocketAddress localAddress) {
setLocalAddress((SocketAddress) localAddress);
}
@Override
- protected void doBind() throws Exception {
- ServiceOperationFuture request = new ServiceOperationFuture();
-
- registerQueue.add(request);
- startupWorker();
- selector.wakeup();
-
- request.awaitUninterruptibly();
-
- if (request.getException() != null) {
- throw request.getException();
+ protected DatagramChannel bind(SocketAddress localAddress) throws Exception {
+ DatagramChannel c = DatagramChannel.open();
+ boolean success = false;
+ try {
+ DatagramSessionConfig cfg = getSessionConfig();
+ c.socket().setReuseAddress(cfg.isReuseAddress());
+ c.socket().setBroadcast(cfg.isBroadcast());
+ c.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
+ c.socket().setSendBufferSize(cfg.getSendBufferSize());
+
+ if (c.socket().getTrafficClass() != cfg.getTrafficClass()) {
+ c.socket().setTrafficClass(cfg.getTrafficClass());
+ }
+
+ c.configureBlocking(false);
+ c.socket().bind(localAddress);
+ c.register(selector, SelectionKey.OP_READ);
+ success = true;
+ } finally {
+ if (!success) {
+ unbind(c);
+ }
}
- Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
- for (DatagramChannel c: serverChannels.values()) {
- newLocalAddresses.add(c.socket().getLocalSocketAddress());
- }
- setLocalAddresses(newLocalAddresses);
+ return c;
}
@Override
- protected void doUnbind() throws Exception {
- ServiceOperationFuture request = new ServiceOperationFuture();
-
- cancelQueue.add(request);
- startupWorker();
- selector.wakeup();
-
- request.awaitUninterruptibly();
-
- if (request.getException() != null) {
- throw request.getException();
- }
- }
-
- public IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
- if (isDisposed()) {
- throw new IllegalStateException("Already disposed.");
- }
-
- if (remoteAddress == null) {
- throw new NullPointerException("remoteAddress");
+ protected boolean isReadable(DatagramChannel handle) {
+ SelectionKey key = handle.keyFor(selector);
+ if (key == null) {
+ return false;
}
-
- synchronized (bindLock) {
- if (!isActive()) {
- throw new IllegalStateException(
- "Can't create a session from a unbound service.");
- }
-
- return newSessionWithoutLock(remoteAddress, localAddress);
+ if (!key.isValid()) {
+ return false;
}
+ return key.isReadable();
}
- private IoSession newSessionWithoutLock(
- SocketAddress remoteAddress, SocketAddress localAddress) {
- Selector selector = this.selector;
- DatagramChannel ch = serverChannels.get(localAddress);
- if (ch == null) {
- throw new IllegalArgumentException("Unknown local address: " + localAddress);
- }
- SelectionKey key = ch.keyFor(selector);
-
- IoSession session;
- IoSessionRecycler sessionRecycler = getSessionRecycler();
- synchronized (sessionRecycler) {
- session = sessionRecycler.recycle(localAddress, remoteAddress);
- if (session != null) {
- return session;
- }
-
- // If a new session needs to be created.
- NioDatagramSession datagramSession = new NioDatagramSession(
- this, ch, processor, remoteAddress);
- datagramSession.setSelectionKey(key);
-
- getSessionRecycler().put(datagramSession);
- session = datagramSession;
- }
-
- finishSessionInitialization(session, null);
-
- try {
- this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
- getListeners().fireSessionCreated(session);
- } catch (Throwable t) {
- ExceptionMonitor.getInstance().exceptionCaught(t);
+ @Override
+ protected boolean isWritable(DatagramChannel handle) {
+ SelectionKey key = handle.keyFor(selector);
+ if (key == null) {
+ return false;
}
-
- return session;
- }
-
- public IoSessionRecycler getSessionRecycler() {
- return sessionRecycler;
- }
-
- public void setSessionRecycler(IoSessionRecycler sessionRecycler) {
- synchronized (bindLock) {
- if (isActive()) {
- throw new IllegalStateException(
- "sessionRecycler can't be set while the acceptor is bound.");
- }
-
- if (sessionRecycler == null) {
- sessionRecycler = DEFAULT_RECYCLER;
- }
- this.sessionRecycler = sessionRecycler;
+ if (!key.isValid()) {
+ return false;
}
+ return key.isWritable();
}
@Override
- protected IoServiceListenerSupport getListeners() {
- return super.getListeners();
- }
-
- IoProcessor<NioSession> getProcessor() {
- return processor;
- }
-
- private class DatagramAcceptorProcessor implements IoProcessor<NioSession> {
-
- public void add(NioSession session) {
- }
-
- public void flush(NioSession session) {
- if (scheduleFlush((NioDatagramSession) session)) {
- Selector selector = NioDatagramAcceptor.this.selector;
- if (selector != null) {
- selector.wakeup();
- }
- }
- }
-
- public void remove(NioSession session) {
- getSessionRecycler().remove(session);
- getListeners().fireSessionDestroyed(session);
- }
-
- public void updateTrafficMask(NioSession session) {
- throw new UnsupportedOperationException();
- }
-
- public void dispose() {
- // TODO Implement me.
- }
+ protected SocketAddress localAddress(DatagramChannel handle)
+ throws Exception {
+ return handle.socket().getLocalSocketAddress();
}
- private void startupWorker() {
- if (!selector.isOpen()) {
- registerQueue.clear();
- cancelQueue.clear();
- flushingSessions.clear();
- throw new ClosedSelectorException();
- }
- synchronized (this) {
- if (worker == null) {
- worker = new Worker();
- executor.execute(
- new NamePreservingRunnable(worker, threadName));
- }
+ @Override
+ protected NioSession newSession(DatagramChannel handle,
+ SocketAddress remoteAddress) {
+ SelectionKey key = handle.keyFor(selector);
+ if (key == null) {
+ return null;
}
+ NioDatagramSession newSession = new NioDatagramSession(
+ this, handle, getProcessor(), remoteAddress);
+ newSession.setSelectionKey(key);
+
+ return newSession;
}
- private boolean scheduleFlush(NioDatagramSession session) {
- if (session.setScheduledForFlush(true)) {
- flushingSessions.add(session);
- return true;
- } else {
- return false;
- }
+ @Override
+ protected SocketAddress receive(DatagramChannel handle, IoBuffer buffer)
+ throws Exception {
+ return handle.receive(buffer.buf());
}
- private class Worker implements Runnable {
- public void run() {
- lastIdleCheckTime = System.currentTimeMillis();
-
- for (; ;) {
- try {
- int nKeys = selector.select(1000);
-
- registerNew();
-
- if (nKeys > 0) {
- processReadySessions(selector.selectedKeys());
- }
-
- flushSessions();
- cancelKeys();
-
- notifyIdleSessions();
-
- if (selector.keys().isEmpty()) {
- synchronized (NioDatagramAcceptor.this) {
- if (selector.keys().isEmpty()
- && registerQueue.isEmpty()
- && cancelQueue.isEmpty()) {
- worker = null;
- break;
- }
- }
- }
- } catch (IOException e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e1) {
- }
- }
- }
-
- if (isDisposed()) {
- disposeNow();
- }
- }
+ @Override
+ protected boolean select(int timeout) throws Exception {
+ return selector.select(timeout) > 0;
}
- private void processReadySessions(Set<SelectionKey> keys) {
- Iterator<SelectionKey> it = keys.iterator();
- while (it.hasNext()) {
- SelectionKey key = it.next();
- it.remove();
-
- DatagramChannel ch = (DatagramChannel) key.channel();
-
- try {
- if (key.isReadable()) {
- readSession(ch);
- }
-
- if (key.isWritable()) {
- for (IoSession session : getManagedSessions()) {
- scheduleFlush((NioDatagramSession) session);
- }
- }
- } catch (Throwable t) {
- ExceptionMonitor.getInstance().exceptionCaught(t);
- }
- }
+ @Override
+ protected boolean selectable() {
+ return selector.isOpen();
}
- private void readSession(DatagramChannel channel) throws Exception {
- IoBuffer readBuf = IoBuffer.allocate(getSessionConfig()
- .getReceiveBufferSize());
-
- SocketAddress remoteAddress = channel.receive(readBuf.buf());
- if (remoteAddress != null) {
- IoSession session = newSessionWithoutLock(
- remoteAddress, channel.socket().getLocalSocketAddress());
-
- readBuf.flip();
-
- IoBuffer newBuf = IoBuffer.allocate(readBuf.limit());
- newBuf.put(readBuf);
- newBuf.flip();
-
- session.getFilterChain().fireMessageReceived(newBuf);
- }
+ @Override
+ protected Iterator<DatagramChannel> selectedHandles() {
+ return new DatagramChannelIterator(selector.selectedKeys());
}
- private void flushSessions() {
- for (; ;) {
- NioDatagramSession session = flushingSessions.poll();
- if (session == null) {
- break;
- }
-
- session.setScheduledForFlush(false);
-
- try {
- boolean flushedAll = flush(session);
- if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
- !session.isScheduledForFlush()) {
- scheduleFlush(session);
- }
- } catch (IOException e) {
- session.getFilterChain().fireExceptionCaught(e);
- }
- }
+ @Override
+ protected int send(NioSession session, IoBuffer buffer,
+ SocketAddress remoteAddress) throws Exception {
+ return ((DatagramChannel) session.getChannel()).send(
+ buffer.buf(), remoteAddress);
}
- private boolean flush(NioDatagramSession session) throws IOException {
- // Clear OP_WRITE
+ @Override
+ protected void setInterestedInWrite(NioSession session, boolean interested)
+ throws Exception {
SelectionKey key = session.getSelectionKey();
if (key == null) {
- scheduleFlush(session);
- return false;
- }
- if (!key.isValid()) {
- return false;
+ return;
}
- key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-
- DatagramChannel ch = session.getChannel();
- WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
-
- int writtenBytes = 0;
- int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
- for (; ;) {
- WriteRequest req = session.getCurrentWriteRequest();
- if (req == null) {
- req = writeRequestQueue.poll(session);
- if (req == null) {
- break;
- }
- session.setCurrentWriteRequest(req);
- }
-
- IoBuffer buf = (IoBuffer) req.getMessage();
- if (buf.remaining() == 0) {
- // Clear and fire event
- session.setCurrentWriteRequest(null);
- buf.reset();
- session.getFilterChain().fireMessageSent(req);
- continue;
- }
-
- SocketAddress destination = req.getDestination();
- if (destination == null) {
- destination = session.getRemoteAddress();
- }
-
- int localWrittenBytes = ch.send(buf.buf(), destination);
- if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
- // Kernel buffer is full or wrote too much
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- return false;
- } else {
- key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-
- // Clear and fire event
- session.setCurrentWriteRequest(null);
- writtenBytes += localWrittenBytes;
- buf.reset();
- session.getFilterChain().fireMessageSent(req);
- }
+
+ if (interested) {
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ } else {
+ key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
}
-
- return true;
}
- private void registerNew() {
- if (registerQueue.isEmpty()) {
- return;
+ @Override
+ protected void unbind(DatagramChannel handle) throws Exception {
+ SelectionKey key = handle.keyFor(selector);
+ if (key != null) {
+ key.cancel();
}
+ handle.disconnect();
+ handle.close();
+ }
- for (; ;) {
- ServiceOperationFuture req = registerQueue.poll();
- if (req == null) {
- break;
- }
-
- Map<SocketAddress, DatagramChannel> newServerChannels =
- new HashMap<SocketAddress, DatagramChannel>();
- List<SocketAddress> localAddresses = getLocalAddresses();
-
- try {
- for (SocketAddress a: localAddresses) {
- DatagramChannel c = null;
- boolean success = false;
- try {
- c = DatagramChannel.open();
- DatagramSessionConfig cfg = getSessionConfig();
- c.socket().setReuseAddress(cfg.isReuseAddress());
- c.socket().setBroadcast(cfg.isBroadcast());
- c.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
- c.socket().setSendBufferSize(cfg.getSendBufferSize());
+ @Override
+ protected void wakeup() {
+ selector.wakeup();
+ }
+
+ private static class DatagramChannelIterator implements Iterator<DatagramChannel> {
- if (c.socket().getTrafficClass() != cfg.getTrafficClass()) {
- c.socket().setTrafficClass(cfg.getTrafficClass());
- }
+ private final Iterator<SelectionKey> i;
- c.configureBlocking(false);
- c.socket().bind(a);
- c.register(selector, SelectionKey.OP_READ, req);
- success = true;
- } finally {
- if (c != null && !success) {
- try {
- c.disconnect();
- c.close();
- } catch (Throwable e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- }
- }
-
- newServerChannels.put(c.socket().getLocalSocketAddress(), c);
- }
-
- serverChannels.putAll(newServerChannels);
-
- getListeners().fireServiceActivated();
- req.setDone();
- } catch (Exception e) {
- req.setException(e);
- } finally {
- // Roll back if failed to bind all addresses.
- if (req.getException() != null) {
- for (DatagramChannel c: newServerChannels.values()) {
- c.keyFor(selector).cancel();
- try {
- c.disconnect();
- c.close();
- } catch (IOException e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- }
- selector.wakeup();
- }
- }
+ private DatagramChannelIterator(Collection<SelectionKey> keys) {
+ this.i = keys.iterator();
+ }
+
+ public boolean hasNext() {
+ return i.hasNext();
}
- }
-
- private void cancelKeys() {
- for (; ;) {
- ServiceOperationFuture request = cancelQueue.poll();
- if (request == null) {
- break;
- }
- // close the channels
- for (DatagramChannel c: serverChannels.values()) {
- try {
- SelectionKey key = c.keyFor(selector);
- key.cancel();
-
- selector.wakeup(); // wake up again to trigger thread death
- c.disconnect();
- c.close();
- } catch (IOException e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- }
-
- serverChannels.clear();
- request.setDone();
+ public DatagramChannel next() {
+ return (DatagramChannel) i.next().channel();
}
- }
- private void notifyIdleSessions() {
- // process idle sessions
- long currentTime = System.currentTimeMillis();
- if (currentTime - lastIdleCheckTime >= 1000) {
- lastIdleCheckTime = currentTime;
- IdleStatusChecker.notifyIdleness(
- getListeners().getManagedSessions().iterator(),
- currentTime);
+ public void remove() {
+ i.remove();
}
+
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java?rev=596909&r1=596908&r2=596909&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java Tue Nov 20 18:31:56 2007
@@ -99,7 +99,7 @@
@Override
protected NioSession newSession(IoProcessor<NioSession> processor,
- DatagramChannel handle) throws Exception {
+ DatagramChannel handle) {
return new NioDatagramSession(this, handle, processor);
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java?rev=596909&r1=596908&r2=596909&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java Tue Nov 20 18:31:56 2007
@@ -149,8 +149,7 @@
}
@Override
- protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle)
- throws Exception {
+ protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle) {
return new NioSocketSession(this, processor, handle);
}
Modified: mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java?rev=596909&r1=596908&r2=596909&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java Tue Nov 20 18:31:56 2007
@@ -30,6 +30,7 @@
import org.apache.mina.common.IoBuffer;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
import org.apache.mina.util.AvailablePortFinder;
/**
@@ -138,8 +139,9 @@
acceptorHandler.session = null;
// Write whatever to trigger the acceptor again.
- future.getSession().write(IoBuffer.allocate(1))
- .awaitUninterruptibly();
+ WriteFuture wf = future.getSession().write(
+ IoBuffer.allocate(1)).awaitUninterruptibly();
+ Assert.assertTrue(wf.isWritten());
// Make sure the connection is closed before recycler closes it.
while (acceptorHandler.session == null) {