You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/11/21 03:31:57 UTC

svn commit: r596909 - in /mina/trunk/core/src: main/java/org/apache/mina/common/ main/java/org/apache/mina/transport/socket/nio/ test/java/org/apache/mina/transport/socket/nio/

Author: trustin
Date: Tue Nov 20 18:31:56 2007
New Revision: 596909

URL: http://svn.apache.org/viewvc?rev=596909&view=rev
Log:
* Added an abstraction layer for connectionless polling acceptors - AbstractPollingConnectionlessIoAcceptor
* DatagramAcceptor now extends AbstractPollingConnectionlessIoAcceptor


Added:
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java   (with props)
Modified:
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoConnector.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java
    mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java

Added: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java?rev=596909&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java Tue Nov 20 18:31:56 2007
@@ -0,0 +1,521 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.ClosedSelectorException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mina.transport.socket.DatagramSessionConfig;
+import org.apache.mina.util.NamePreservingRunnable;
+import org.apache.mina.util.NewThreadExecutor;
+
+/**
+ * {@link IoAcceptor} for datagram transport (UDP/IP).
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public abstract class AbstractPollingConnectionlessIoAcceptor<T extends AbstractIoSession, H>
+        extends AbstractIoAcceptor {
+
+    private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
+
+    private static final AtomicInteger id = new AtomicInteger();
+
+    private final Executor executor;
+    private final String threadName;
+    private final IoProcessor<T> processor = new ConnectionlessAcceptorProcessor();
+    private final Queue<ServiceOperationFuture> registerQueue = new ConcurrentLinkedQueue<ServiceOperationFuture>();
+    private final Queue<ServiceOperationFuture> cancelQueue = new ConcurrentLinkedQueue<ServiceOperationFuture>();
+    private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
+    private final Map<SocketAddress, H> boundHandles =
+        Collections.synchronizedMap(new HashMap<SocketAddress, H>());
+
+    private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
+
+    private Worker worker;
+    private long lastIdleCheckTime;
+
+    /**
+     * Creates a new instance.
+     */
+    protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig) {
+        this(sessionConfig, new NewThreadExecutor());
+    }
+
+    /**
+     * Creates a new instance.
+     */
+    protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig, Executor executor) {
+        super(sessionConfig);
+
+        threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
+        this.executor = executor;
+        
+        doInit();
+    }
+
+    protected abstract void doInit();
+    protected abstract void doDispose0();
+    protected abstract boolean selectable();
+    protected abstract boolean select(int timeout) throws Exception;
+    protected abstract void wakeup();
+    protected abstract Iterator<H> selectedHandles();
+    protected abstract H bind(SocketAddress localAddress) throws Exception;
+    protected abstract void unbind(H handle) throws Exception;
+    protected abstract SocketAddress localAddress(H handle) throws Exception;
+    protected abstract boolean isReadable(H handle);
+    protected abstract boolean isWritable(H handle);
+    protected abstract SocketAddress receive(H handle, IoBuffer buffer) throws Exception;
+    protected abstract int send(T session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception;
+    protected abstract T newSession(H handle, SocketAddress remoteAddress);
+    protected abstract void setInterestedInWrite(T session, boolean interested) throws Exception;
+
+
+    @Override
+    public DatagramSessionConfig getSessionConfig() {
+        return (DatagramSessionConfig) super.getSessionConfig();
+    }
+
+    @Override
+    public InetSocketAddress getLocalAddress() {
+        return (InetSocketAddress) super.getLocalAddress();
+    }
+
+    public void setLocalAddress(InetSocketAddress localAddress) {
+        setLocalAddress((SocketAddress) localAddress);
+    }
+
+    @Override
+    protected void doBind() throws Exception {
+        ServiceOperationFuture request = new ServiceOperationFuture();
+
+        registerQueue.add(request);
+        startupWorker();
+        wakeup();
+
+        request.awaitUninterruptibly();
+
+        if (request.getException() != null) {
+            throw request.getException();
+        }
+
+        Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
+        for (H handle: boundHandles.values()) {
+            newLocalAddresses.add(localAddress(handle));
+        }
+        setLocalAddresses(newLocalAddresses);
+    }
+
+    @Override
+    protected void doUnbind() throws Exception {
+        ServiceOperationFuture request = new ServiceOperationFuture();
+
+        cancelQueue.add(request);
+        startupWorker();
+        wakeup();
+
+        request.awaitUninterruptibly();
+
+        if (request.getException() != null) {
+            throw request.getException();
+        }
+    }
+
+    public IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
+        if (isDisposed()) {
+            throw new IllegalStateException("Already disposed.");
+        }
+
+        if (remoteAddress == null) {
+            throw new NullPointerException("remoteAddress");
+        }
+
+        synchronized (bindLock) {
+            if (!isActive()) {
+                throw new IllegalStateException(
+                        "Can't create a session from a unbound service.");
+            }
+
+            return newSessionWithoutLock(remoteAddress, localAddress);
+        }
+    }
+
+    private IoSession newSessionWithoutLock(
+            SocketAddress remoteAddress, SocketAddress localAddress) {
+        H handle = boundHandles.get(localAddress);
+        if (handle == null) {
+            throw new IllegalArgumentException("Unknown local address: " + localAddress);
+        }
+
+        IoSession session;
+        IoSessionRecycler sessionRecycler = getSessionRecycler();
+        synchronized (sessionRecycler) {
+            session = sessionRecycler.recycle(localAddress, remoteAddress);
+            if (session != null) {
+                return session;
+            }
+
+            // If a new session needs to be created.
+            T newSession = newSession(handle, remoteAddress);
+            getSessionRecycler().put(newSession);
+            session = newSession;
+        }
+
+        finishSessionInitialization(session, null);
+
+        try {
+            this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+            getListeners().fireSessionCreated(session);
+        } catch (Throwable t) {
+            ExceptionMonitor.getInstance().exceptionCaught(t);
+        }
+
+        return session;
+    }
+
+    public IoSessionRecycler getSessionRecycler() {
+        return sessionRecycler;
+    }
+
+    public void setSessionRecycler(IoSessionRecycler sessionRecycler) {
+        synchronized (bindLock) {
+            if (isActive()) {
+                throw new IllegalStateException(
+                        "sessionRecycler can't be set while the acceptor is bound.");
+            }
+
+            if (sessionRecycler == null) {
+                sessionRecycler = DEFAULT_RECYCLER;
+            }
+            this.sessionRecycler = sessionRecycler;
+        }
+    }
+
+    @Override
+    protected IoServiceListenerSupport getListeners() {
+        return super.getListeners();
+    }
+
+    protected IoProcessor<T> getProcessor() {
+        return processor;
+    }
+
+    private class ConnectionlessAcceptorProcessor implements IoProcessor<T> {
+
+        public void add(T session) {
+        }
+
+        public void flush(T session) {
+            if (scheduleFlush(session)) {
+                wakeup();
+            }
+        }
+
+        public void remove(T session) {
+            getSessionRecycler().remove(session);
+            getListeners().fireSessionDestroyed(session);
+        }
+
+        public void updateTrafficMask(T session) {
+            throw new UnsupportedOperationException();
+        }
+
+        public void dispose() {
+        }
+    }
+
+    private  void startupWorker() {
+        if (!selectable()) {
+            registerQueue.clear();
+            cancelQueue.clear();
+            flushingSessions.clear();
+            throw new ClosedSelectorException();
+        }
+        synchronized (this) {
+            if (worker == null) {
+                worker = new Worker();
+                executor.execute(
+                        new NamePreservingRunnable(worker, threadName));
+            }
+        }
+    }
+
+    private boolean scheduleFlush(T session) {
+        if (session.setScheduledForFlush(true)) {
+            flushingSessions.add(session);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private class Worker implements Runnable {
+        public void run() {
+            int nHandles = 0;
+            lastIdleCheckTime = System.currentTimeMillis();
+
+            for (; ;) {
+                try {
+                    boolean selected = select(1000);
+
+                    nHandles += registerHandles();
+
+                    if (selected) {
+                        processReadySessions(selectedHandles());
+                    }
+
+                    flushSessions();
+                    nHandles -= unregisterHandles();
+
+                    notifyIdleSessions();
+
+                    if (nHandles == 0) {
+                        synchronized (AbstractPollingConnectionlessIoAcceptor.this) {
+                            if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
+                                worker = null;
+                                break;
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    ExceptionMonitor.getInstance().exceptionCaught(e);
+
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e1) {
+                    }
+                }
+            }
+            
+            if (isDisposed()) {
+                doDispose0();
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void processReadySessions(Iterator<H> handles) {
+        while (handles.hasNext()) {
+            H h = handles.next();
+            handles.remove();
+            try {
+                if (isReadable(h)) {
+                    readHandle(h);
+                }
+
+                if (isWritable(h)) {
+                    for (IoSession session : getManagedSessions()) {
+                        scheduleFlush((T) session);
+                    }
+                }
+            } catch (Throwable t) {
+                ExceptionMonitor.getInstance().exceptionCaught(t);
+            }
+        }
+    }
+
+    private void readHandle(H handle) throws Exception {
+        IoBuffer readBuf = IoBuffer.allocate(getSessionConfig()
+                .getReceiveBufferSize());
+
+        SocketAddress remoteAddress = receive(handle, readBuf);
+        if (remoteAddress != null) {
+            IoSession session = newSessionWithoutLock(
+                    remoteAddress, localAddress(handle));
+
+            readBuf.flip();
+
+            IoBuffer newBuf = IoBuffer.allocate(readBuf.limit());
+            newBuf.put(readBuf);
+            newBuf.flip();
+
+            session.getFilterChain().fireMessageReceived(newBuf);
+        }
+    }
+
+    private void flushSessions() {
+        for (; ;) {
+            T session = flushingSessions.poll();
+            if (session == null) {
+                break;
+            }
+
+            session.setScheduledForFlush(false);
+
+            try {
+                boolean flushedAll = flush(session);
+                if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
+                    !session.isScheduledForFlush()) {
+                    scheduleFlush(session);
+                }
+            } catch (Exception e) {
+                session.getFilterChain().fireExceptionCaught(e);
+            }
+        }
+    }
+
+    private boolean flush(T session) throws Exception {
+        // Clear OP_WRITE
+        setInterestedInWrite(session, false);
+        
+        WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
+
+        int maxWrittenBytes =
+            session.getConfig().getMaxReadBufferSize() +
+            (session.getConfig().getMaxReadBufferSize() >>> 1);
+
+        int writtenBytes = 0;
+        for (; ;) {
+            WriteRequest req = session.getCurrentWriteRequest();
+            if (req == null) {
+                req = writeRequestQueue.poll(session);
+                if (req == null) {
+                    break;
+                }
+                session.setCurrentWriteRequest(req);
+            }
+
+            IoBuffer buf = (IoBuffer) req.getMessage();
+            if (buf.remaining() == 0) {
+                // Clear and fire event
+                session.setCurrentWriteRequest(null);
+                buf.reset();
+                session.getFilterChain().fireMessageSent(req);
+                continue;
+            }
+
+            SocketAddress destination = req.getDestination();
+            if (destination == null) {
+                destination = session.getRemoteAddress();
+            }
+
+            int localWrittenBytes = send(session, buf, destination);
+            if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
+                // Kernel buffer is full or wrote too much
+                setInterestedInWrite(session, true);
+                return false;
+            } else {
+                setInterestedInWrite(session, false);
+
+                // Clear and fire event
+                session.setCurrentWriteRequest(null);
+                writtenBytes += localWrittenBytes;
+                buf.reset();
+                session.getFilterChain().fireMessageSent(req);
+            }
+        }
+
+        return true;
+    }
+
+    private int registerHandles() {
+        if (registerQueue.isEmpty()) {
+            return 0;
+        }
+
+        for (; ;) {
+            ServiceOperationFuture req = registerQueue.poll();
+            if (req == null) {
+                break;
+            }
+
+            Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
+            List<SocketAddress> localAddresses = getLocalAddresses();
+            try {
+                for (SocketAddress a: localAddresses) {
+                    H handle = bind(a);
+                    newHandles.put(localAddress(handle), handle);
+                }
+                
+                boundHandles.putAll(newHandles);
+                
+                getListeners().fireServiceActivated();
+                req.setDone();
+                return boundHandles.size();
+            } catch (Exception e) {
+                req.setException(e);
+            } finally {
+                // Roll back if failed to bind all addresses.
+                if (req.getException() != null) {
+                    for (H handle: newHandles.values()) {
+                        try {
+                            unbind(handle);
+                        } catch (Exception e) {
+                            ExceptionMonitor.getInstance().exceptionCaught(e);
+                        }
+                    }
+                    wakeup();
+                }
+            }
+        }
+        
+        return 0;
+    }
+
+    private int unregisterHandles() {
+        int nHandles = 0;
+        for (; ;) {
+            ServiceOperationFuture request = cancelQueue.poll();
+            if (request == null) {
+                break;
+            }
+
+            // close the channels
+            for (H handle: boundHandles.values()) {
+                try {
+                    unbind(handle);
+                    wakeup(); // wake up again to trigger thread death
+                } catch (Exception e) {
+                    ExceptionMonitor.getInstance().exceptionCaught(e);
+                }
+                nHandles ++;
+            }
+            
+            boundHandles.clear();
+            request.setDone();
+        }
+        
+        return nHandles;
+    }
+
+    private void notifyIdleSessions() {
+        // process idle sessions
+        long currentTime = System.currentTimeMillis();
+        if (currentTime - lastIdleCheckTime >= 1000) {
+            lastIdleCheckTime = currentTime;
+            IdleStatusChecker.notifyIdleness(
+                    getListeners().getManagedSessions().iterator(),
+                    currentTime);
+        }
+    }
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingConnectionlessIoAcceptor.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java?rev=596909&r1=596908&r2=596909&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java Tue Nov 20 18:31:56 2007
@@ -288,6 +288,7 @@
 
                 // and notify.
                 future.setDone();
+                return boundHandles.size();
             } catch (Exception e) {
                 future.setException(e);
             } finally {
@@ -305,7 +306,7 @@
             }
         }
         
-        return boundHandles.size();
+        return 0;
     }
 
     /**

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoConnector.java?rev=596909&r1=596908&r2=596909&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoConnector.java Tue Nov 20 18:31:56 2007
@@ -89,7 +89,7 @@
     protected abstract H newHandle(SocketAddress localAddress) throws Exception;
     protected abstract boolean connect(H handle, SocketAddress remoteAddress) throws Exception;
     protected abstract void finishConnect(H handle) throws Exception;
-    protected abstract T newSession(IoProcessor<T> processor, H handle) throws Exception;
+    protected abstract T newSession(IoProcessor<T> processor, H handle);
     protected abstract void destroy(H handle) throws Exception;
     protected abstract void wakeup();
     protected abstract boolean selectable();

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java?rev=596909&r1=596908&r2=596909&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java Tue Nov 20 18:31:56 2007
@@ -22,41 +22,22 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.DatagramChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Collection;
 import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.mina.common.AbstractIoAcceptor;
+import org.apache.mina.common.AbstractPollingConnectionlessIoAcceptor;
 import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.ExpiringSessionRecycler;
-import org.apache.mina.common.IdleStatusChecker;
 import org.apache.mina.common.IoAcceptor;
 import org.apache.mina.common.IoBuffer;
-import org.apache.mina.common.IoProcessor;
-import org.apache.mina.common.IoServiceListenerSupport;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoSessionRecycler;
 import org.apache.mina.common.RuntimeIoException;
 import org.apache.mina.common.TransportMetadata;
-import org.apache.mina.common.WriteRequest;
-import org.apache.mina.common.WriteRequestQueue;
 import org.apache.mina.transport.socket.DatagramAcceptor;
 import org.apache.mina.transport.socket.DatagramSessionConfig;
 import org.apache.mina.transport.socket.DefaultDatagramSessionConfig;
-import org.apache.mina.util.NamePreservingRunnable;
-import org.apache.mina.util.NewThreadExecutor;
 
 /**
  * {@link IoAcceptor} for datagram transport (UDP/IP).
@@ -64,51 +45,37 @@
  * @author The Apache MINA Project (dev@mina.apache.org)
  * @version $Rev$, $Date$
  */
-public class NioDatagramAcceptor extends AbstractIoAcceptor implements DatagramAcceptor {
-    private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
+public class NioDatagramAcceptor
+        extends AbstractPollingConnectionlessIoAcceptor<NioSession, DatagramChannel>
+        implements DatagramAcceptor {
 
-    private static final AtomicInteger id = new AtomicInteger();
-
-    private final Executor executor;
-    private final String threadName;
-    private final Selector selector;
-    private final IoProcessor<NioSession> processor = new DatagramAcceptorProcessor();
-    private final Queue<ServiceOperationFuture> registerQueue = new ConcurrentLinkedQueue<ServiceOperationFuture>();
-    private final Queue<ServiceOperationFuture> cancelQueue = new ConcurrentLinkedQueue<ServiceOperationFuture>();
-    private final Queue<NioDatagramSession> flushingSessions = new ConcurrentLinkedQueue<NioDatagramSession>();
-    private final Map<SocketAddress, DatagramChannel> serverChannels =
-        Collections.synchronizedMap(new HashMap<SocketAddress, DatagramChannel>());
-
-    private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
-
-    private Worker worker;
-    private long lastIdleCheckTime;
+    private volatile Selector selector;
 
     /**
      * Creates a new instance.
      */
     public NioDatagramAcceptor() {
-        this(new NewThreadExecutor());
+        super(new DefaultDatagramSessionConfig());
     }
 
     /**
      * Creates a new instance.
      */
     public NioDatagramAcceptor(Executor executor) {
-        super(new DefaultDatagramSessionConfig());
-
-        threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
-
+        super(new DefaultDatagramSessionConfig(), executor);
+    }
+    
+    @Override
+    protected void doInit() {
         try {
             this.selector = Selector.open();
         } catch (IOException e) {
             throw new RuntimeIoException("Failed to open a selector.", e);
         }
-
-        this.executor = executor;
     }
 
-    private void disposeNow() {
+    @Override
+    protected void doDispose0() {
         if (selector != null) {
             try {
                 selector.close();
@@ -124,467 +91,168 @@
 
     @Override
     public DatagramSessionConfig getSessionConfig() {
-        return (DatagramSessionConfig) super.getSessionConfig();
+        return super.getSessionConfig();
     }
 
     @Override
     public InetSocketAddress getLocalAddress() {
-        return (InetSocketAddress) super.getLocalAddress();
+        return super.getLocalAddress();
     }
 
+    @Override
     public void setLocalAddress(InetSocketAddress localAddress) {
         setLocalAddress((SocketAddress) localAddress);
     }
 
     @Override
-    protected void doBind() throws Exception {
-        ServiceOperationFuture request = new ServiceOperationFuture();
-
-        registerQueue.add(request);
-        startupWorker();
-        selector.wakeup();
-
-        request.awaitUninterruptibly();
-
-        if (request.getException() != null) {
-            throw request.getException();
+    protected DatagramChannel bind(SocketAddress localAddress) throws Exception {
+        DatagramChannel c = DatagramChannel.open();
+        boolean success = false;
+        try {
+            DatagramSessionConfig cfg = getSessionConfig();
+            c.socket().setReuseAddress(cfg.isReuseAddress());
+            c.socket().setBroadcast(cfg.isBroadcast());
+            c.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
+            c.socket().setSendBufferSize(cfg.getSendBufferSize());
+    
+            if (c.socket().getTrafficClass() != cfg.getTrafficClass()) {
+                c.socket().setTrafficClass(cfg.getTrafficClass());
+            }
+    
+            c.configureBlocking(false);
+            c.socket().bind(localAddress);
+            c.register(selector, SelectionKey.OP_READ);
+            success = true;
+        } finally {
+            if (!success) {
+                unbind(c);
+            }
         }
 
-        Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
-        for (DatagramChannel c: serverChannels.values()) {
-            newLocalAddresses.add(c.socket().getLocalSocketAddress());
-        }
-        setLocalAddresses(newLocalAddresses);
+        return c;
     }
 
     @Override
-    protected void doUnbind() throws Exception {
-        ServiceOperationFuture request = new ServiceOperationFuture();
-
-        cancelQueue.add(request);
-        startupWorker();
-        selector.wakeup();
-
-        request.awaitUninterruptibly();
-
-        if (request.getException() != null) {
-            throw request.getException();
-        }
-    }
-
-    public IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
-        if (isDisposed()) {
-            throw new IllegalStateException("Already disposed.");
-        }
-
-        if (remoteAddress == null) {
-            throw new NullPointerException("remoteAddress");
+    protected boolean isReadable(DatagramChannel handle) {
+        SelectionKey key = handle.keyFor(selector);
+        if (key == null) {
+            return false;
         }
-
-        synchronized (bindLock) {
-            if (!isActive()) {
-                throw new IllegalStateException(
-                        "Can't create a session from a unbound service.");
-            }
-
-            return newSessionWithoutLock(remoteAddress, localAddress);
+        if (!key.isValid()) {
+            return false;
         }
+        return key.isReadable();
     }
 
-    private IoSession newSessionWithoutLock(
-            SocketAddress remoteAddress, SocketAddress localAddress) {
-        Selector selector = this.selector;
-        DatagramChannel ch = serverChannels.get(localAddress);
-        if (ch == null) {
-            throw new IllegalArgumentException("Unknown local address: " + localAddress);
-        }
-        SelectionKey key = ch.keyFor(selector);
-
-        IoSession session;
-        IoSessionRecycler sessionRecycler = getSessionRecycler();
-        synchronized (sessionRecycler) {
-            session = sessionRecycler.recycle(localAddress, remoteAddress);
-            if (session != null) {
-                return session;
-            }
-
-            // If a new session needs to be created.
-            NioDatagramSession datagramSession = new NioDatagramSession(
-                    this, ch, processor, remoteAddress);
-            datagramSession.setSelectionKey(key);
-
-            getSessionRecycler().put(datagramSession);
-            session = datagramSession;
-        }
-
-        finishSessionInitialization(session, null);
-
-        try {
-            this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
-            getListeners().fireSessionCreated(session);
-        } catch (Throwable t) {
-            ExceptionMonitor.getInstance().exceptionCaught(t);
+    @Override
+    protected boolean isWritable(DatagramChannel handle) {
+        SelectionKey key = handle.keyFor(selector);
+        if (key == null) {
+            return false;
         }
-
-        return session;
-    }
-
-    public IoSessionRecycler getSessionRecycler() {
-        return sessionRecycler;
-    }
-
-    public void setSessionRecycler(IoSessionRecycler sessionRecycler) {
-        synchronized (bindLock) {
-            if (isActive()) {
-                throw new IllegalStateException(
-                        "sessionRecycler can't be set while the acceptor is bound.");
-            }
-
-            if (sessionRecycler == null) {
-                sessionRecycler = DEFAULT_RECYCLER;
-            }
-            this.sessionRecycler = sessionRecycler;
+        if (!key.isValid()) {
+            return false;
         }
+        return key.isWritable();
     }
 
     @Override
-    protected IoServiceListenerSupport getListeners() {
-        return super.getListeners();
-    }
-
-    IoProcessor<NioSession> getProcessor() {
-        return processor;
-    }
-
-    private class DatagramAcceptorProcessor implements IoProcessor<NioSession> {
-
-        public void add(NioSession session) {
-        }
-
-        public void flush(NioSession session) {
-            if (scheduleFlush((NioDatagramSession) session)) {
-                Selector selector = NioDatagramAcceptor.this.selector;
-                if (selector != null) {
-                    selector.wakeup();
-                }
-            }
-        }
-
-        public void remove(NioSession session) {
-            getSessionRecycler().remove(session);
-            getListeners().fireSessionDestroyed(session);
-        }
-
-        public void updateTrafficMask(NioSession session) {
-            throw new UnsupportedOperationException();
-        }
-
-        public void dispose() {
-            // TODO Implement me.
-        }
+    protected SocketAddress localAddress(DatagramChannel handle)
+            throws Exception {
+        return handle.socket().getLocalSocketAddress();
     }
 
-    private  void startupWorker() {
-        if (!selector.isOpen()) {
-            registerQueue.clear();
-            cancelQueue.clear();
-            flushingSessions.clear();
-            throw new ClosedSelectorException();
-        }
-        synchronized (this) {
-            if (worker == null) {
-                worker = new Worker();
-                executor.execute(
-                        new NamePreservingRunnable(worker, threadName));
-            }
+    @Override
+    protected NioSession newSession(DatagramChannel handle,
+            SocketAddress remoteAddress) {
+        SelectionKey key = handle.keyFor(selector);
+        if (key == null) {
+            return null;
         }
+        NioDatagramSession newSession = new NioDatagramSession(
+                this, handle, getProcessor(), remoteAddress);
+        newSession.setSelectionKey(key);
+        
+        return newSession;
     }
 
-    private boolean scheduleFlush(NioDatagramSession session) {
-        if (session.setScheduledForFlush(true)) {
-            flushingSessions.add(session);
-            return true;
-        } else {
-            return false;
-        }
+    @Override
+    protected SocketAddress receive(DatagramChannel handle, IoBuffer buffer)
+            throws Exception {
+        return handle.receive(buffer.buf());
     }
 
-    private class Worker implements Runnable {
-        public void run() {
-            lastIdleCheckTime = System.currentTimeMillis();
-
-            for (; ;) {
-                try {
-                    int nKeys = selector.select(1000);
-
-                    registerNew();
-
-                    if (nKeys > 0) {
-                        processReadySessions(selector.selectedKeys());
-                    }
-
-                    flushSessions();
-                    cancelKeys();
-
-                    notifyIdleSessions();
-
-                    if (selector.keys().isEmpty()) {
-                        synchronized (NioDatagramAcceptor.this) {
-                            if (selector.keys().isEmpty()
-                                    && registerQueue.isEmpty()
-                                    && cancelQueue.isEmpty()) {
-                                worker = null;
-                                break;
-                            }
-                        }
-                    }
-                } catch (IOException e) {
-                    ExceptionMonitor.getInstance().exceptionCaught(e);
-
-                    try {
-                        Thread.sleep(1000);
-                    } catch (InterruptedException e1) {
-                    }
-                }
-            }
-            
-            if (isDisposed()) {
-                disposeNow();
-            }
-        }
+    @Override
+    protected boolean select(int timeout) throws Exception {
+        return selector.select(timeout) > 0;
     }
 
-    private void processReadySessions(Set<SelectionKey> keys) {
-        Iterator<SelectionKey> it = keys.iterator();
-        while (it.hasNext()) {
-            SelectionKey key = it.next();
-            it.remove();
-
-            DatagramChannel ch = (DatagramChannel) key.channel();
-
-            try {
-                if (key.isReadable()) {
-                    readSession(ch);
-                }
-
-                if (key.isWritable()) {
-                    for (IoSession session : getManagedSessions()) {
-                        scheduleFlush((NioDatagramSession) session);
-                    }
-                }
-            } catch (Throwable t) {
-                ExceptionMonitor.getInstance().exceptionCaught(t);
-            }
-        }
+    @Override
+    protected boolean selectable() {
+        return selector.isOpen();
     }
 
-    private void readSession(DatagramChannel channel) throws Exception {
-        IoBuffer readBuf = IoBuffer.allocate(getSessionConfig()
-                .getReceiveBufferSize());
-
-        SocketAddress remoteAddress = channel.receive(readBuf.buf());
-        if (remoteAddress != null) {
-            IoSession session = newSessionWithoutLock(
-                    remoteAddress, channel.socket().getLocalSocketAddress());
-
-            readBuf.flip();
-
-            IoBuffer newBuf = IoBuffer.allocate(readBuf.limit());
-            newBuf.put(readBuf);
-            newBuf.flip();
-
-            session.getFilterChain().fireMessageReceived(newBuf);
-        }
+    @Override
+    protected Iterator<DatagramChannel> selectedHandles() {
+        return new DatagramChannelIterator(selector.selectedKeys());
     }
 
-    private void flushSessions() {
-        for (; ;) {
-            NioDatagramSession session = flushingSessions.poll();
-            if (session == null) {
-                break;
-            }
-
-            session.setScheduledForFlush(false);
-
-            try {
-                boolean flushedAll = flush(session);
-                if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
-                    !session.isScheduledForFlush()) {
-                    scheduleFlush(session);
-                }
-            } catch (IOException e) {
-                session.getFilterChain().fireExceptionCaught(e);
-            }
-        }
+    @Override
+    protected int send(NioSession session, IoBuffer buffer,
+            SocketAddress remoteAddress) throws Exception {
+        return ((DatagramChannel) session.getChannel()).send(
+                buffer.buf(), remoteAddress);
     }
 
-    private boolean flush(NioDatagramSession session) throws IOException {
-        // Clear OP_WRITE
+    @Override
+    protected void setInterestedInWrite(NioSession session, boolean interested)
+            throws Exception {
         SelectionKey key = session.getSelectionKey();
         if (key == null) {
-            scheduleFlush(session);
-            return false;
-        }
-        if (!key.isValid()) {
-            return false;
+            return;
         }
-        key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-
-        DatagramChannel ch = session.getChannel();
-        WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
-
-        int writtenBytes = 0;
-        int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
-        for (; ;) {
-            WriteRequest req = session.getCurrentWriteRequest();
-            if (req == null) {
-                req = writeRequestQueue.poll(session);
-                if (req == null) {
-                    break;
-                }
-                session.setCurrentWriteRequest(req);
-            }
-
-            IoBuffer buf = (IoBuffer) req.getMessage();
-            if (buf.remaining() == 0) {
-                // Clear and fire event
-                session.setCurrentWriteRequest(null);
-                buf.reset();
-                session.getFilterChain().fireMessageSent(req);
-                continue;
-            }
-
-            SocketAddress destination = req.getDestination();
-            if (destination == null) {
-                destination = session.getRemoteAddress();
-            }
-
-            int localWrittenBytes = ch.send(buf.buf(), destination);
-            if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
-                // Kernel buffer is full or wrote too much
-                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-                return false;
-            } else {
-                key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-
-                // Clear and fire event
-                session.setCurrentWriteRequest(null);
-                writtenBytes += localWrittenBytes;
-                buf.reset();
-                session.getFilterChain().fireMessageSent(req);
-            }
+        
+        if (interested) {
+            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+        } else {
+            key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
         }
-
-        return true;
     }
 
-    private void registerNew() {
-        if (registerQueue.isEmpty()) {
-            return;
+    @Override
+    protected void unbind(DatagramChannel handle) throws Exception {
+        SelectionKey key = handle.keyFor(selector);
+        if (key != null) {
+            key.cancel();
         }
+        handle.disconnect();
+        handle.close();
+    }
 
-        for (; ;) {
-            ServiceOperationFuture req = registerQueue.poll();
-            if (req == null) {
-                break;
-            }
-
-            Map<SocketAddress, DatagramChannel> newServerChannels =
-                new HashMap<SocketAddress, DatagramChannel>();
-            List<SocketAddress> localAddresses = getLocalAddresses();
-
-            try {
-                for (SocketAddress a: localAddresses) {
-                    DatagramChannel c = null;
-                    boolean success = false;
-                    try {
-                        c = DatagramChannel.open();
-                        DatagramSessionConfig cfg = getSessionConfig();
-                        c.socket().setReuseAddress(cfg.isReuseAddress());
-                        c.socket().setBroadcast(cfg.isBroadcast());
-                        c.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
-                        c.socket().setSendBufferSize(cfg.getSendBufferSize());
+    @Override
+    protected void wakeup() {
+        selector.wakeup();
+    }
+    
+    private static class DatagramChannelIterator implements Iterator<DatagramChannel> {
         
-                        if (c.socket().getTrafficClass() != cfg.getTrafficClass()) {
-                            c.socket().setTrafficClass(cfg.getTrafficClass());
-                        }
+        private final Iterator<SelectionKey> i;
         
-                        c.configureBlocking(false);
-                        c.socket().bind(a);
-                        c.register(selector, SelectionKey.OP_READ, req);
-                        success = true;
-                    } finally {
-                        if (c != null && !success) {
-                            try {
-                                c.disconnect();
-                                c.close();
-                            } catch (Throwable e) {
-                                ExceptionMonitor.getInstance().exceptionCaught(e);
-                            }
-                        }
-                    }
-                    
-                    newServerChannels.put(c.socket().getLocalSocketAddress(), c);
-                }
-                
-                serverChannels.putAll(newServerChannels);
-                
-                getListeners().fireServiceActivated();
-                req.setDone();
-            } catch (Exception e) {
-                req.setException(e);
-            } finally {
-                // Roll back if failed to bind all addresses.
-                if (req.getException() != null) {
-                    for (DatagramChannel c: newServerChannels.values()) {
-                        c.keyFor(selector).cancel();
-                        try {
-                            c.disconnect();
-                            c.close();
-                        } catch (IOException e) {
-                            ExceptionMonitor.getInstance().exceptionCaught(e);
-                        }
-                    }
-                    selector.wakeup();
-                }
-            }
+        private DatagramChannelIterator(Collection<SelectionKey> keys) {
+            this.i = keys.iterator();
+        }
+        
+        public boolean hasNext() {
+            return i.hasNext();
         }
-    }
-
-    private void cancelKeys() {
-        for (; ;) {
-            ServiceOperationFuture request = cancelQueue.poll();
-            if (request == null) {
-                break;
-            }
 
-            // close the channels
-            for (DatagramChannel c: serverChannels.values()) {
-                try {
-                    SelectionKey key = c.keyFor(selector);
-                    key.cancel();
-
-                    selector.wakeup(); // wake up again to trigger thread death
-                    c.disconnect();
-                    c.close();
-                } catch (IOException e) {
-                    ExceptionMonitor.getInstance().exceptionCaught(e);
-                }
-            }
-            
-            serverChannels.clear();
-            request.setDone();
+        public DatagramChannel next() {
+            return (DatagramChannel) i.next().channel();
         }
-    }
 
-    private void notifyIdleSessions() {
-        // process idle sessions
-        long currentTime = System.currentTimeMillis();
-        if (currentTime - lastIdleCheckTime >= 1000) {
-            lastIdleCheckTime = currentTime;
-            IdleStatusChecker.notifyIdleness(
-                    getListeners().getManagedSessions().iterator(),
-                    currentTime);
+        public void remove() {
+            i.remove();
         }
+        
     }
 }

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java?rev=596909&r1=596908&r2=596909&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java Tue Nov 20 18:31:56 2007
@@ -99,7 +99,7 @@
 
     @Override
     protected NioSession newSession(IoProcessor<NioSession> processor,
-            DatagramChannel handle) throws Exception {
+            DatagramChannel handle) {
         return new NioDatagramSession(this, handle, processor);
     }
 

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java?rev=596909&r1=596908&r2=596909&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketConnector.java Tue Nov 20 18:31:56 2007
@@ -149,8 +149,7 @@
     }
 
     @Override
-    protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle)
-            throws Exception {
+    protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle) {
         return new NioSocketSession(this, processor, handle);
     }
 

Modified: mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java?rev=596909&r1=596908&r2=596909&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java Tue Nov 20 18:31:56 2007
@@ -30,6 +30,7 @@
 import org.apache.mina.common.IoBuffer;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
 import org.apache.mina.util.AvailablePortFinder;
 
 /**
@@ -138,8 +139,9 @@
             acceptorHandler.session = null;
             
             // Write whatever to trigger the acceptor again.
-            future.getSession().write(IoBuffer.allocate(1))
-                    .awaitUninterruptibly();
+            WriteFuture wf = future.getSession().write(
+                    IoBuffer.allocate(1)).awaitUninterruptibly();
+            Assert.assertTrue(wf.isWritten());
             
             // Make sure the connection is closed before recycler closes it.
             while (acceptorHandler.session == null) {