You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/08/14 15:16:31 UTC

svn commit: r565735 - in /mina/trunk: core/src/main/java/org/apache/mina/common/ core/src/main/java/org/apache/mina/filter/codec/ core/src/main/java/org/apache/mina/filter/codec/demux/ core/src/main/java/org/apache/mina/transport/socket/nio/ core/src/m...

Author: trustin
Date: Tue Aug 14 06:16:29 2007
New Revision: 565735

URL: http://svn.apache.org/viewvc?view=rev&rev=565735
Log:
* Renamed TransportType to IoServiceMetadata
* Added IoServiceMetadata.hasFragmentation()
* Added DefaultIoServiceMetadata
* Removed static fields in IoServiceMetadata which caused bad coupling
* Removed IoSession.getTransportType() because IoServiceMetadata can be retrieved via IoSession.getService().getMetadata()
* Removed transport.socket.nio.support package, and moved the classes there to transport.socket.nio.  Datagram transport doesn't use wrapper anymore.
* Fixed freezing SocketBindTest




Added:
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramFilterChain.java
      - copied, changed from r565678, mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramServiceMetadata.java   (with props)
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
      - copied, changed from r565678, mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketServiceMetadata.java   (with props)
    mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeServiceMetadata.java   (with props)
Removed:
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/
Modified:
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoFilterChain.java
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
    mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoServiceMetadata.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceMetadata.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java
    mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
    mina/trunk/core/src/main/java/org/apache/mina/filter/codec/demux/DemuxingProtocolCodecFactory.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSession.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
    mina/trunk/core/src/test/java/org/apache/mina/common/FutureTest.java
    mina/trunk/core/src/test/java/org/apache/mina/common/IoFilterChainTest.java
    mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java
    mina/trunk/core/src/test/java/org/apache/mina/filter/codec/CumulativeProtocolDecoderTest.java
    mina/trunk/core/src/test/java/org/apache/mina/handler/chain/ChainedIoHandlerTest.java
    mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java
    mina/trunk/example/src/test/java/org/apache/mina/example/echoserver/ConnectorTest.java
    mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java
    mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoFilterChain.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoFilterChain.java Tue Aug 14 06:16:29 2007
@@ -567,9 +567,6 @@
         @Override
         public void filterWrite(NextFilter nextFilter, IoSession session,
                 WriteRequest writeRequest) throws Exception {
-            // I used assertion here because isAssignableFrom takes a lot of CPU.
-            assert session.getTransportType().getEnvelopeType()
-                    .isAssignableFrom(writeRequest.getMessage().getClass());
             doWrite(session, writeRequest);
         }
 

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java Tue Aug 14 06:16:29 2007
@@ -536,8 +536,22 @@
 
     @Override
     public String toString() {
-        return "(" + getTransportType() + ", R: " + getRemoteAddress()
-                + ", L: " + getLocalAddress() + ", S: " + getServiceAddress()
+        return "(" + getServiceName()
+                + ", R: " + getRemoteAddress()
+                + ", L: " + getLocalAddress()
+                + ", S: " + getServiceAddress()
                 + ')';
+    }
+    
+    private String getServiceName() {
+        if (getService() == null) {
+            return "null";
+        }
+        
+        if (getService().getMetadata() == null) {
+            return "null";
+        }
+        
+        return getService().getMetadata().getName();
     }
 }

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoServiceMetadata.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoServiceMetadata.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoServiceMetadata.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoServiceMetadata.java Tue Aug 14 06:16:29 2007
@@ -31,16 +31,16 @@
 public class DefaultIoServiceMetadata implements IoServiceMetadata {
 
     private final String name;
-
     private final boolean connectionless;
-
+    private final boolean fragmentation;
     private final Class<? extends SocketAddress> addressType;
-
     private final Class<? extends Object> envelopeType;
-
     private final Class<? extends IoSessionConfig> sessionConfigType;
 
-    public DefaultIoServiceMetadata(String name, boolean connectionless,
+    public DefaultIoServiceMetadata(
+            String name,
+            boolean connectionless,
+            boolean fragmentation,
             Class<? extends SocketAddress> addressType,
             Class<? extends Object> envelopeType,
             Class<? extends IoSessionConfig> sessionConfigType) {
@@ -62,13 +62,14 @@
         if (envelopeType == null) {
             throw new NullPointerException("envelopeType");
         }
-
+        
         if (sessionConfigType == null) {
             throw new NullPointerException("sessionConfigType");
         }
 
         this.name = name;
         this.connectionless = connectionless;
+        this.fragmentation = fragmentation;
         this.addressType = addressType;
         this.envelopeType = envelopeType;
         this.sessionConfigType = sessionConfigType;
@@ -92,6 +93,10 @@
 
     public boolean isConnectionless() {
         return connectionless;
+    }
+    
+    public boolean hasFragmentation() {
+        return fragmentation;
     }
 
     @Override

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceMetadata.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceMetadata.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceMetadata.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceMetadata.java Tue Aug 14 06:16:29 2007
@@ -19,16 +19,10 @@
  */
 package org.apache.mina.common;
 
-import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 
-import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.mina.transport.vmpipe.VmPipeAddress;
-import org.apache.mina.transport.vmpipe.VmPipeSessionConfig;
-
 /**
- * Represents a network transport type and its related metadata.
+ * Provides meta-information that describes an {@link IoService}.
  * 
  * @author The Apache MINA Project (dev@mina.apache.org)
  * @version $Rev$, $Date$
@@ -36,48 +30,36 @@
 public interface IoServiceMetadata {
 
     /**
-     * A pre-defined transport type for TCP/IP socket transport.
-     */
-    static IoServiceMetadata SOCKET = new DefaultIoServiceMetadata("socket", false,
-            InetSocketAddress.class, ByteBuffer.class,
-            SocketSessionConfig.class);
-
-    /**
-     * A pre-defined transport type for UDP/IP datagram transport.
-     */
-    static IoServiceMetadata DATAGRAM = new DefaultIoServiceMetadata("datagram", true,
-            InetSocketAddress.class, ByteBuffer.class,
-            DatagramSessionConfig.class);
-
-    /**
-     * A pre-defined transport type for in-VM pipe transport.
-     */
-    static IoServiceMetadata VM_PIPE = new DefaultIoServiceMetadata("vmpipe", false,
-            VmPipeAddress.class, Object.class, VmPipeSessionConfig.class);
-
-    /**
-     * Returns the name of this transport type.
+     * Returns the name of the service.
      */
     String getName();
 
     /**
      * Returns <code>true</code> if the session of this transport type is
-     * connectionless.
+     * <a href="http://en.wikipedia.org/wiki/Connectionless">connectionless</a>.
      */
     boolean isConnectionless();
+    
+    /**
+     * Returns {@code true} if the messages exchanged by the service can be
+     * <a href="http://en.wikipedia.org/wiki/IPv4#Fragmentation_and_reassembly">fragmented
+     * or reassembled</a> by its underlying transport. 
+     */
+    boolean hasFragmentation();
 
     /**
-     * Returns the address type of this transport type.
+     * Returns the address type of the service.
      */
     Class<? extends SocketAddress> getAddressType();
 
     /**
-     * Returns the type of the envelope message of this transport type.
+     * Returns the allowed message type when you write to an
+     * {@link IoSession} that is managed by the service.
      */
     Class<? extends Object> getEnvelopeType();
 
     /**
-     * Returns the type of the {@link IoSessionConfig} of this transport type.
+     * Returns the type of the {@link IoSessionConfig} of the service
      */
     Class<? extends IoSessionConfig> getSessionConfigType();
 }

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java Tue Aug 14 06:16:29 2007
@@ -209,11 +209,6 @@
     Set<String> getAttributeKeys();
 
     /**
-     * Returns transport type of this session.
-     */
-    IoServiceMetadata getTransportType();
-
-    /**
      * Returns <code>true</code> if this session is connected with remote peer.
      */
     boolean isConnected();

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java Tue Aug 14 06:16:29 2007
@@ -218,7 +218,7 @@
             throw pee;
         } finally {
             // Dispose the encoder if this session is connectionless.
-            if (session.getTransportType().isConnectionless()) {
+            if (session.getService().getMetadata().isConnectionless()) {
                 disposeEncoder(session);
             }
         }

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/codec/demux/DemuxingProtocolCodecFactory.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/codec/demux/DemuxingProtocolCodecFactory.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/codec/demux/DemuxingProtocolCodecFactory.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/codec/demux/DemuxingProtocolCodecFactory.java Tue Aug 14 06:16:29 2007
@@ -151,7 +151,7 @@
         // Do nothing by default; let users implement it as they want.
 
         // This statement is just to avoid compiler warning.  Please ignore. 
-        session.getTransportType();
+        session.getService();
     }
 
     private class ProtocolEncoderImpl implements ProtocolEncoder {

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java Tue Aug 14 06:16:29 2007
@@ -19,14 +19,30 @@
  */
 package org.apache.mina.transport.socket.nio;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 
-import org.apache.mina.common.IoAcceptorWrapper;
+import org.apache.mina.common.AbstractIoAcceptor;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.ExpiringIoSessionRecycler;
 import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoServiceListenerSupport;
+import org.apache.mina.common.IoServiceMetadata;
+import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionRecycler;
-import org.apache.mina.transport.socket.nio.support.DatagramAcceptorDelegate;
+import org.apache.mina.common.RuntimeIOException;
+import org.apache.mina.common.WriteRequest;
+import org.apache.mina.util.NamePreservingRunnable;
 import org.apache.mina.util.NewThreadExecutor;
 
 /**
@@ -35,46 +51,179 @@
  * @author The Apache MINA Project (dev@mina.apache.org)
  * @version $Rev$, $Date$
  */
-public class DatagramAcceptor extends IoAcceptorWrapper {
+public class DatagramAcceptor extends AbstractIoAcceptor implements
+        IoAcceptor {
+    private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringIoSessionRecycler();
+
+    private static volatile int nextId = 0;
+
+    private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
+
+    private final Executor executor;
+
+    private final int id = nextId++;
+
+    private final Selector selector;
+
+    private DatagramChannel channel;
+
+    private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
+
+    private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>();
+
+    private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
+
+    private Worker worker;
+    
     /**
-     * Creates a new instance using a NewThreadExecutor
+     * Creates a new instance.
      */
     public DatagramAcceptor() {
-        init(new DatagramAcceptorDelegate(this, new NewThreadExecutor()));
+        this(new NewThreadExecutor());
     }
 
     /**
      * Creates a new instance.
-     * 
-     * @param executor Executor to use for launching threads
      */
     public DatagramAcceptor(Executor executor) {
-        init(new DatagramAcceptorDelegate(this, executor));
+        super(new DefaultDatagramSessionConfig());
+
+        // The default reuseAddress should be 'true' for an accepted socket.
+        getSessionConfig().setReuseAddress(true);
+
+        try {
+            this.selector = Selector.open();
+        } catch (IOException e) {
+            throw new RuntimeIOException("Failed to open a selector.", e);
+        }
+
+        this.executor = executor;
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        super.finalize();
+        try {
+            selector.close();
+        } catch (IOException e) {
+            ExceptionMonitor.getInstance().exceptionCaught(e);
+        }
+    }
+
+    public IoServiceMetadata getMetadata() {
+        return DatagramServiceMetadata.INSTANCE;
     }
 
     @Override
     public DatagramSessionConfig getSessionConfig() {
-        return ((DatagramAcceptorDelegate) acceptor).getSessionConfig();
+        return (DatagramSessionConfig) super.getSessionConfig();
     }
 
     @Override
     public InetSocketAddress getLocalAddress() {
-        return ((DatagramAcceptorDelegate) acceptor).getLocalAddress();
+        return (InetSocketAddress) super.getLocalAddress();
     }
 
-    // This method is overriden to work around a problem with
-    // bean property access mechanism.
+    @Override
+    protected void doBind() throws IOException {
+        RegistrationRequest request = new RegistrationRequest();
+
+        startupWorker();
+        registerQueue.offer(request);
+        selector.wakeup();
+
+        synchronized (request) {
+            while (!request.done) {
+                try {
+                    request.wait();
+                } catch (InterruptedException e) {
+                }
+            }
+        }
+
+        if (request.exception != null) {
+            throw (IOException) new IOException("Failed to bind")
+                    .initCause(request.exception);
+        } else {
+            setLocalAddress(channel.socket().getLocalSocketAddress());
+        }
+    }
 
     @Override
-    public void setLocalAddress(SocketAddress localAddress) {
-        super.setLocalAddress(localAddress);
+    protected void doUnbind() {
+        CancellationRequest request = new CancellationRequest();
+
+        startupWorker();
+        cancelQueue.offer(request);
+        selector.wakeup();
+
+        synchronized (request) {
+            while (!request.done) {
+                try {
+                    request.wait();
+                } catch (InterruptedException e) {
+                }
+            }
+        }
+
+        if (request.exception != null) {
+            throw new RuntimeException("Failed to unbind", request.exception);
+        }
+    }
+
+    public IoSession newSession(SocketAddress remoteAddress) {
+        if (remoteAddress == null) {
+            throw new NullPointerException("remoteAddress");
+        }
+
+        synchronized (bindLock) {
+            if (!isBound()) {
+                throw new IllegalStateException(
+                        "Can't create a session from a unbound service.");
+            }
+
+            return newSessionWithoutLock(remoteAddress);
+        }
+    }
+
+    private IoSession newSessionWithoutLock(SocketAddress remoteAddress) {
+        Selector selector = this.selector;
+        DatagramChannel ch = this.channel;
+        SelectionKey key = ch.keyFor(selector);
+
+        IoSession session;
+        IoSessionRecycler sessionRecycler = getSessionRecycler();
+        synchronized (sessionRecycler) {
+            session = sessionRecycler.recycle(getLocalAddress(), remoteAddress);
+            if (session != null) {
+                return session;
+            }
+
+            // If a new session needs to be created.
+            DatagramSessionImpl datagramSession = new DatagramSessionImpl(
+                    this, ch, getHandler(),
+                    (InetSocketAddress) remoteAddress);
+            datagramSession.setSelectionKey(key);
+
+            getSessionRecycler().put(datagramSession);
+            session = datagramSession;
+        }
+
+        try {
+            buildFilterChain(session);
+            getListeners().fireSessionCreated(session);
+        } catch (Throwable t) {
+            ExceptionMonitor.getInstance().exceptionCaught(t);
+        }
+
+        return session;
     }
 
     /**
      * Returns the {@link IoSessionRecycler} for this service.
      */
     public IoSessionRecycler getSessionRecycler() {
-        return ((DatagramAcceptorDelegate) acceptor).getSessionRecycler();
+        return sessionRecycler;
     }
 
     /**
@@ -83,7 +232,299 @@
      * @param sessionRecycler <tt>null</tt> to use the default recycler
      */
     public void setSessionRecycler(IoSessionRecycler sessionRecycler) {
-        ((DatagramAcceptorDelegate) acceptor)
-                .setSessionRecycler(sessionRecycler);
+        synchronized (bindLock) {
+            if (isBound()) {
+                throw new IllegalStateException(
+                        "sessionRecycler can't be set while the acceptor is bound.");
+            }
+
+            if (sessionRecycler == null) {
+                sessionRecycler = DEFAULT_RECYCLER;
+            }
+            this.sessionRecycler = sessionRecycler;
+        }
+    }
+
+    @Override
+    protected IoServiceListenerSupport getListeners() {
+        return super.getListeners();
+    }
+
+    private void buildFilterChain(IoSession session) throws Exception {
+        this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+    }
+
+    private synchronized void startupWorker() {
+        if (worker == null) {
+            worker = new Worker();
+            executor.execute(new NamePreservingRunnable(worker));
+        }
+    }
+
+    void flushSession(DatagramSessionImpl session) {
+        scheduleFlush(session);
+        Selector selector = this.selector;
+        if (selector != null) {
+            selector.wakeup();
+        }
+    }
+
+    private void scheduleFlush(DatagramSessionImpl session) {
+        flushingSessions.offer(session);
+    }
+
+    private class Worker implements Runnable {
+        public void run() {
+            Thread.currentThread().setName("DatagramAcceptor-" + id);
+
+            for (;;) {
+                try {
+                    int nKeys = selector.select();
+
+                    registerNew();
+
+                    if (nKeys > 0) {
+                        processReadySessions(selector.selectedKeys());
+                    }
+
+                    flushSessions();
+                    cancelKeys();
+
+                    if (selector.keys().isEmpty()) {
+                        synchronized (DatagramAcceptor.this) {
+                            if (selector.keys().isEmpty()
+                                    && registerQueue.isEmpty()
+                                    && cancelQueue.isEmpty()) {
+                                worker = null;
+                                break;
+                            }
+                        }
+                    }
+                } catch (IOException e) {
+                    ExceptionMonitor.getInstance().exceptionCaught(e);
+
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e1) {
+                    }
+                }
+            }
+        }
+    }
+
+    private void processReadySessions(Set<SelectionKey> keys) {
+        Iterator<SelectionKey> it = keys.iterator();
+        while (it.hasNext()) {
+            SelectionKey key = it.next();
+            it.remove();
+
+            DatagramChannel ch = (DatagramChannel) key.channel();
+
+            try {
+                if (key.isReadable()) {
+                    readSession(ch);
+                }
+
+                if (key.isWritable()) {
+                    for (IoSession session : getManagedSessions()) {
+                        scheduleFlush((DatagramSessionImpl) session);
+                    }
+                }
+            } catch (Throwable t) {
+                ExceptionMonitor.getInstance().exceptionCaught(t);
+            }
+        }
+    }
+
+    private void readSession(DatagramChannel channel) throws Exception {
+        ByteBuffer readBuf = ByteBuffer.allocate(getSessionConfig()
+                .getReceiveBufferSize());
+
+        SocketAddress remoteAddress = channel.receive(readBuf.buf());
+        if (remoteAddress != null) {
+            DatagramSessionImpl session = (DatagramSessionImpl) newSessionWithoutLock(remoteAddress);
+
+            readBuf.flip();
+
+            ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
+            newBuf.put(readBuf);
+            newBuf.flip();
+
+            session.increaseReadBytes(newBuf.remaining());
+            session.getFilterChain().fireMessageReceived(session, newBuf);
+        }
+    }
+
+    private void flushSessions() {
+        for (;;) {
+            DatagramSessionImpl session = flushingSessions.poll();
+            if (session == null) {
+                break;
+            }
+
+            try {
+                flush(session);
+            } catch (IOException e) {
+                session.getFilterChain().fireExceptionCaught(session, e);
+            }
+        }
+    }
+
+    private void flush(DatagramSessionImpl session) throws IOException {
+        DatagramChannel ch = session.getChannel();
+
+        Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
+
+        WriteRequest req;
+        for (;;) {
+            synchronized (writeRequestQueue) {
+                req = writeRequestQueue.peek();
+            }
+
+            if (req == null) {
+                break;
+            }
+
+            ByteBuffer buf = (ByteBuffer) req.getMessage();
+            if (buf.remaining() == 0) {
+                // pop and fire event
+                synchronized (writeRequestQueue) {
+                    writeRequestQueue.poll();
+                }
+
+                session.increaseWrittenMessages();
+                buf.reset();
+                ((DatagramFilterChain) session.getFilterChain())
+                        .fireMessageSent(session, req);
+                continue;
+            }
+
+            SelectionKey key = session.getSelectionKey();
+            if (key == null) {
+                scheduleFlush(session);
+                break;
+            }
+            if (!key.isValid()) {
+                continue;
+            }
+
+            SocketAddress destination = req.getDestination();
+            if (destination == null) {
+                destination = session.getRemoteAddress();
+            }
+
+            int writtenBytes = ch.send(buf.buf(), destination);
+
+            if (writtenBytes == 0) {
+                // Kernel buffer is full
+                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+            } else if (writtenBytes > 0) {
+                key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+
+                // pop and fire event
+                synchronized (writeRequestQueue) {
+                    writeRequestQueue.poll();
+                }
+
+                session.increaseWrittenBytes(writtenBytes);
+                session.increaseWrittenMessages();
+                buf.reset();
+                session.getFilterChain().fireMessageSent(session, req);
+            }
+        }
+    }
+
+    private void registerNew() {
+        if (registerQueue.isEmpty()) {
+            return;
+        }
+
+        for (;;) {
+            RegistrationRequest req = registerQueue.poll();
+            if (req == null) {
+                break;
+            }
+
+            DatagramChannel ch = null;
+            try {
+                ch = DatagramChannel.open();
+                DatagramSessionConfig cfg = getSessionConfig();
+                ch.socket().setReuseAddress(cfg.isReuseAddress());
+                ch.socket().setBroadcast(cfg.isBroadcast());
+                ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
+                ch.socket().setSendBufferSize(cfg.getSendBufferSize());
+
+                if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {
+                    ch.socket().setTrafficClass(cfg.getTrafficClass());
+                }
+
+                ch.configureBlocking(false);
+                ch.socket().bind(getLocalAddress());
+                ch.register(selector, SelectionKey.OP_READ, req);
+                this.channel = ch;
+
+                getListeners().fireServiceActivated();
+            } catch (Throwable t) {
+                req.exception = t;
+            } finally {
+                synchronized (req) {
+                    req.done = true;
+                    req.notify();
+                }
+
+                if (ch != null && req.exception != null) {
+                    try {
+                        ch.disconnect();
+                        ch.close();
+                    } catch (Throwable e) {
+                        ExceptionMonitor.getInstance().exceptionCaught(e);
+                    }
+                }
+            }
+        }
+    }
+
+    private void cancelKeys() {
+        for (;;) {
+            CancellationRequest request = cancelQueue.poll();
+            if (request == null) {
+                break;
+            }
+
+            DatagramChannel ch = this.channel;
+            this.channel = null;
+
+            // close the channel
+            try {
+                SelectionKey key = ch.keyFor(selector);
+                key.cancel();
+                selector.wakeup(); // wake up again to trigger thread death
+                ch.disconnect();
+                ch.close();
+            } catch (Throwable t) {
+                ExceptionMonitor.getInstance().exceptionCaught(t);
+            } finally {
+                synchronized (request) {
+                    request.done = true;
+                    request.notify();
+                }
+
+                if (request.exception == null) {
+                    getListeners().fireServiceDeactivated();
+                }
+            }
+        }
+    }
+
+    private static class RegistrationRequest {
+        private Throwable exception;
+
+        private boolean done;
+    }
+
+    private static class CancellationRequest {
+        private boolean done;
+
+        private RuntimeException exception;
     }
 }

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java Tue Aug 14 06:16:29 2007
@@ -19,11 +19,29 @@
  */
 package org.apache.mina.transport.socket.nio;
 
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 
-import org.apache.mina.common.IoConnectorWrapper;
+import org.apache.mina.common.AbstractIoConnector;
+import org.apache.mina.common.AbstractIoFilterChain;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.DefaultConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoConnector;
-import org.apache.mina.transport.socket.nio.support.DatagramConnectorDelegate;
+import org.apache.mina.common.IoServiceMetadata;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.RuntimeIOException;
+import org.apache.mina.common.WriteRequest;
+import org.apache.mina.util.NamePreservingRunnable;
 import org.apache.mina.util.NewThreadExecutor;
 
 /**
@@ -32,25 +50,409 @@
  * @author The Apache MINA Project (dev@mina.apache.org)
  * @version $Rev$, $Date$
  */
-public class DatagramConnector extends IoConnectorWrapper {
+public class DatagramConnector extends AbstractIoConnector {
+    private static volatile int nextId = 0;
+
+    private final Executor executor;
+
+    private final int id = nextId++;
+
+    private final Selector selector;
+
+    private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
+
+    private final Queue<DatagramSessionImpl> cancelQueue = new ConcurrentLinkedQueue<DatagramSessionImpl>();
+
+    private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
+
+    private final Queue<DatagramSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
+
+    private Worker worker;
+
     /**
-     * Creates a new instance using a NewThreadExecutor 
+     * Creates a new instance.
      */
     public DatagramConnector() {
-        init(new DatagramConnectorDelegate(this, new NewThreadExecutor()));
+        this(new NewThreadExecutor());
     }
 
     /**
      * Creates a new instance.
-     * 
-     * @param executor Executor to use for launching threads
      */
     public DatagramConnector(Executor executor) {
-        init(new DatagramConnectorDelegate(this, executor));
+        super(new DefaultDatagramSessionConfig());
+
+        try {
+            this.selector = Selector.open();
+        } catch (IOException e) {
+            throw new RuntimeIOException("Failed to open a selector.", e);
+        }
+
+        this.executor = executor;
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        super.finalize();
+        try {
+            selector.close();
+        } catch (IOException e) {
+            ExceptionMonitor.getInstance().exceptionCaught(e);
+        }
+    }
+
+    public IoServiceMetadata getMetadata() {
+        return DatagramServiceMetadata.INSTANCE;
     }
 
     @Override
     public DatagramSessionConfig getSessionConfig() {
-        return ((DatagramConnectorDelegate) connector).getSessionConfig();
+        return (DatagramSessionConfig) super.getSessionConfig();
+    }
+
+    @Override
+    protected ConnectFuture doConnect(SocketAddress remoteAddress,
+            SocketAddress localAddress) {
+        DatagramChannel ch = null;
+        boolean initialized = false;
+        try {
+            ch = DatagramChannel.open();
+            DatagramSessionConfig cfg = getSessionConfig();
+
+            ch.socket().setReuseAddress(cfg.isReuseAddress());
+            ch.socket().setBroadcast(cfg.isBroadcast());
+            ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
+            ch.socket().setSendBufferSize(cfg.getSendBufferSize());
+
+            if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {
+                ch.socket().setTrafficClass(cfg.getTrafficClass());
+            }
+
+            if (localAddress != null) {
+                ch.socket().bind(localAddress);
+            }
+            ch.connect(remoteAddress);
+            ch.configureBlocking(false);
+            initialized = true;
+        } catch (IOException e) {
+            return DefaultConnectFuture.newFailedFuture(e);
+        } finally {
+            if (!initialized && ch != null) {
+                try {
+                    ch.disconnect();
+                    ch.close();
+                } catch (IOException e) {
+                    ExceptionMonitor.getInstance().exceptionCaught(e);
+                }
+            }
+        }
+
+        RegistrationRequest request = new RegistrationRequest(ch);
+
+        startupWorker();
+        registerQueue.offer(request);
+        selector.wakeup();
+
+        return request;
+    }
+
+    private synchronized void startupWorker() {
+        if (worker == null) {
+            worker = new Worker();
+            executor.execute(new NamePreservingRunnable(worker));
+        }
+    }
+
+    void closeSession(DatagramSessionImpl session) {
+        startupWorker();
+        cancelQueue.offer(session);
+        selector.wakeup();
+    }
+
+    void flushSession(DatagramSessionImpl session) {
+        scheduleFlush(session);
+        Selector selector = this.selector;
+        if (selector != null) {
+            selector.wakeup();
+        }
+    }
+
+    private void scheduleFlush(DatagramSessionImpl session) {
+        flushingSessions.offer(session);
+    }
+
+    void updateTrafficMask(DatagramSessionImpl session) {
+        scheduleTrafficControl(session);
+        Selector selector = this.selector;
+        if (selector != null) {
+            selector.wakeup();
+        }
+        selector.wakeup();
+    }
+
+    private void scheduleTrafficControl(DatagramSessionImpl session) {
+        trafficControllingSessions.offer(session);
+    }
+
+    private void doUpdateTrafficMask() {
+        for (;;) {
+            DatagramSessionImpl session = trafficControllingSessions.poll();
+            if (session == null) {
+                break;
+            }
+
+            SelectionKey key = session.getSelectionKey();
+            // Retry later if session is not yet fully initialized.
+            // (In case that Session.suspend??() or session.resume??() is 
+            // called before addSession() is processed)
+            if (key == null) {
+                scheduleTrafficControl(session);
+                break;
+            }
+            // skip if channel is already closed
+            if (!key.isValid()) {
+                continue;
+            }
+
+            // The normal is OP_READ and, if there are write requests in the
+            // session's write queue, set OP_WRITE to trigger flushing.
+            int ops = SelectionKey.OP_READ;
+            Queue<WriteRequest> writeRequestQueue = session
+                    .getWriteRequestQueue();
+            synchronized (writeRequestQueue) {
+                if (!writeRequestQueue.isEmpty()) {
+                    ops |= SelectionKey.OP_WRITE;
+                }
+            }
+
+            // Now mask the preferred ops with the mask of the current session
+            int mask = session.getTrafficMask().getInterestOps();
+            key.interestOps(ops & mask);
+        }
+    }
+
+    private class Worker implements Runnable {
+        public void run() {
+            Thread.currentThread().setName("DatagramConnector-" + id);
+
+            for (;;) {
+                try {
+                    int nKeys = selector.select();
+
+                    registerNew();
+                    doUpdateTrafficMask();
+
+                    if (nKeys > 0) {
+                        processReadySessions(selector.selectedKeys());
+                    }
+
+                    flushSessions();
+                    cancelKeys();
+
+                    if (selector.keys().isEmpty()) {
+                        synchronized (DatagramConnector.this) {
+                            if (selector.keys().isEmpty()
+                                    && registerQueue.isEmpty()
+                                    && cancelQueue.isEmpty()) {
+                                worker = null;
+                                break;
+                            }
+                        }
+                    }
+                } catch (IOException e) {
+                    ExceptionMonitor.getInstance().exceptionCaught(e);
+
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e1) {
+                    }
+                }
+            }
+        }
+    }
+
+    private void processReadySessions(Set<SelectionKey> keys) {
+        Iterator<SelectionKey> it = keys.iterator();
+        while (it.hasNext()) {
+            SelectionKey key = it.next();
+            it.remove();
+
+            DatagramSessionImpl session = (DatagramSessionImpl) key
+                    .attachment();
+
+            if (key.isReadable() && session.getTrafficMask().isReadable()) {
+                readSession(session);
+            }
+
+            if (key.isWritable() && session.getTrafficMask().isWritable()) {
+                scheduleFlush(session);
+            }
+        }
+    }
+
+    private void readSession(DatagramSessionImpl session) {
+
+        ByteBuffer readBuf = ByteBuffer.allocate(session.getReadBufferSize());
+        try {
+            int readBytes = session.getChannel().read(readBuf.buf());
+            if (readBytes > 0) {
+                readBuf.flip();
+                ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
+                newBuf.put(readBuf);
+                newBuf.flip();
+
+                session.increaseReadBytes(readBytes);
+                session.getFilterChain().fireMessageReceived(session, newBuf);
+            }
+        } catch (IOException e) {
+            session.getFilterChain().fireExceptionCaught(session, e);
+        }
+    }
+
+    private void flushSessions() {
+        for (;;) {
+            DatagramSessionImpl session = flushingSessions.poll();
+            if (session == null) {
+                break;
+            }
+
+            try {
+                flush(session);
+            } catch (IOException e) {
+                session.getFilterChain().fireExceptionCaught(session, e);
+            }
+        }
+    }
+
+    private void flush(DatagramSessionImpl session) throws IOException {
+        DatagramChannel ch = session.getChannel();
+
+        Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
+
+        WriteRequest req;
+        for (;;) {
+            synchronized (writeRequestQueue) {
+                req = writeRequestQueue.peek();
+            }
+
+            if (req == null) {
+                break;
+            }
+
+            ByteBuffer buf = (ByteBuffer) req.getMessage();
+            if (buf.remaining() == 0) {
+                // pop and fire event
+                synchronized (writeRequestQueue) {
+                    writeRequestQueue.poll();
+                }
+
+                session.increaseWrittenMessages();
+                buf.reset();
+                session.getFilterChain().fireMessageSent(session, req);
+                continue;
+            }
+
+            SelectionKey key = session.getSelectionKey();
+            if (key == null) {
+                scheduleFlush(session);
+                break;
+            }
+            if (!key.isValid()) {
+                continue;
+            }
+
+            int writtenBytes = ch.write(buf.buf());
+
+            if (writtenBytes == 0) {
+                // Kernel buffer is full
+                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+            } else if (writtenBytes > 0) {
+                key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+
+                // pop and fire event
+                synchronized (writeRequestQueue) {
+                    writeRequestQueue.poll();
+                }
+
+                session.increaseWrittenBytes(writtenBytes);
+                session.increaseWrittenMessages();
+                buf.reset();
+                session.getFilterChain().fireMessageSent(session, req);
+            }
+        }
+    }
+
+    private void registerNew() {
+        for (;;) {
+            RegistrationRequest req = registerQueue.poll();
+            if (req == null) {
+                break;
+            }
+
+            DatagramSessionImpl session = new DatagramSessionImpl(
+                    this, req.channel, getHandler());
+
+            // AbstractIoFilterChain will notify the connect future.
+            session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, req);
+
+            boolean success = false;
+            try {
+                SelectionKey key = req.channel.register(selector,
+                        SelectionKey.OP_READ, session);
+
+                session.setSelectionKey(key);
+                buildFilterChain(session);
+                // The CONNECT_FUTURE attribute is cleared and notified here.
+                getListeners().fireSessionCreated(session);
+                success = true;
+            } catch (Throwable t) {
+                // The CONNECT_FUTURE attribute is cleared and notified here.
+                session.getFilterChain().fireExceptionCaught(session, t);
+            } finally {
+                if (!success) {
+                    try {
+                        req.channel.disconnect();
+                        req.channel.close();
+                    } catch (IOException e) {
+                        ExceptionMonitor.getInstance().exceptionCaught(e);
+                    }
+                }
+            }
+        }
+    }
+
+    private void buildFilterChain(IoSession session) throws Exception {
+        getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+    }
+
+    private void cancelKeys() {
+        for (;;) {
+            DatagramSessionImpl session = cancelQueue.poll();
+            if (session == null) {
+                break;
+            } else {
+                SelectionKey key = session.getSelectionKey();
+                DatagramChannel ch = (DatagramChannel) key.channel();
+                try {
+                    ch.disconnect();
+                    ch.close();
+                } catch (IOException e) {
+                    ExceptionMonitor.getInstance().exceptionCaught(e);
+                }
+
+                getListeners().fireSessionDestroyed(session);
+                session.getCloseFuture().setClosed();
+                key.cancel();
+                selector.wakeup(); // wake up again to trigger thread death
+            }
+        }
+    }
+
+    private static class RegistrationRequest extends DefaultConnectFuture {
+        private final DatagramChannel channel;
+
+        private RegistrationRequest(DatagramChannel channel) {
+            this.channel = channel;
+        }
     }
 }

Copied: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramFilterChain.java (from r565678, mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java)
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramFilterChain.java?view=diff&rev=565735&p1=mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java&r1=565678&p2=mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramFilterChain.java&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramFilterChain.java Tue Aug 14 06:16:29 2007
@@ -17,13 +17,14 @@
  *  under the License. 
  *  
  */
-package org.apache.mina.transport.socket.nio.support;
+package org.apache.mina.transport.socket.nio;
 
 import java.util.Queue;
 
 import org.apache.mina.common.AbstractIoFilterChain;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoService;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.WriteRequest;
 
@@ -42,16 +43,21 @@
     protected void doWrite(IoSession session, WriteRequest writeRequest) {
         DatagramSessionImpl s = (DatagramSessionImpl) session;
         Queue<WriteRequest> writeRequestQueue = s.getWriteRequestQueue();
-
-        // SocketIoProcessor.doFlush() will reset it after write is finished
-        // because the buffer will be passed with messageSent event. 
         ((ByteBuffer) writeRequest.getMessage()).mark();
+
+        int writeRequestQueueSize;
         synchronized (writeRequestQueue) {
             writeRequestQueue.offer(writeRequest);
-            if (writeRequestQueue.size() == 1
-                    && session.getTrafficMask().isWritable()) {
-                // Notify DatagramService only when writeRequestQueue was empty.
-                s.getManagerDelegate().flushSession(s);
+            writeRequestQueueSize = writeRequestQueue.size();
+        }
+
+        if (writeRequestQueueSize == 1 && session.getTrafficMask().isWritable()) {
+            // Notify SocketIoProcessor only when writeRequestQueue was empty.
+            IoService service = s.getService();
+            if (service instanceof DatagramAcceptor) {
+                ((DatagramAcceptor) service).flushSession(s);
+            } else {
+                ((DatagramConnector) service).flushSession(s);
             }
         }
     }
@@ -59,11 +65,11 @@
     @Override
     protected void doClose(IoSession session) {
         DatagramSessionImpl s = (DatagramSessionImpl) session;
-        DatagramService manager = s.getManagerDelegate();
-        if (manager instanceof DatagramConnectorDelegate) {
-            ((DatagramConnectorDelegate) manager).closeSession(s);
+        IoService service = s.getService();
+        if (service instanceof DatagramConnector) {
+            ((DatagramConnector) service).closeSession(s);
         } else {
-            ((DatagramAcceptorDelegate) manager).getListeners()
+            ((DatagramAcceptor) service).getListeners()
                     .fireSessionDestroyed(session);
             session.getCloseFuture().setClosed();
         }

Added: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramServiceMetadata.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramServiceMetadata.java?view=auto&rev=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramServiceMetadata.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramServiceMetadata.java Tue Aug 14 06:16:29 2007
@@ -0,0 +1,37 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.net.InetSocketAddress;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.DefaultIoServiceMetadata;
+
+class DatagramServiceMetadata extends DefaultIoServiceMetadata {
+    static final DatagramServiceMetadata INSTANCE = new DatagramServiceMetadata();
+    
+    private DatagramServiceMetadata() {
+        super(
+                "datagram", true, false,
+                InetSocketAddress.class,
+                ByteBuffer.class,
+                DatagramSessionConfig.class);
+    }
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramServiceMetadata.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramServiceMetadata.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java (from r565678, mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java)
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java?view=diff&rev=565735&p1=mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java&r1=565678&p2=mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java Tue Aug 14 06:16:29 2007
@@ -17,7 +17,7 @@
  *  under the License. 
  *  
  */
-package org.apache.mina.transport.socket.nio.support;
+package org.apache.mina.transport.socket.nio;
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
@@ -30,19 +30,14 @@
 import org.apache.mina.common.AbstractIoSession;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoService;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionConfig;
 import org.apache.mina.common.RuntimeIOException;
-import org.apache.mina.common.IoServiceMetadata;
 import org.apache.mina.common.WriteFuture;
 import org.apache.mina.common.WriteRequest;
-import org.apache.mina.transport.socket.nio.DatagramSession;
-import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
-import org.apache.mina.transport.socket.nio.DefaultDatagramSessionConfig;
 
 /**
  * An {@link IoSession} for datagram transport (UDP/IP).
@@ -55,8 +50,6 @@
 
     private final DatagramSessionConfig config = new SessionConfigImpl();
 
-    private final DatagramService managerDelegate;
-
     private final DatagramFilterChain filterChain = new DatagramFilterChain(
             this);
 
@@ -77,11 +70,11 @@
     /**
      * Creates a new acceptor instance.
      */
-    DatagramSessionImpl(IoAcceptor service, DatagramService managerDelegate,
+    DatagramSessionImpl(
+            DatagramAcceptor service,
             DatagramChannel ch, IoHandler defaultHandler,
             InetSocketAddress remoteAddress) {
         this.service = service;
-        this.managerDelegate = managerDelegate;
         this.ch = ch;
         this.handler = defaultHandler;
         this.remoteAddress = remoteAddress;
@@ -89,7 +82,7 @@
         // We didn't set the localAddress by calling getLocalSocketAddress() to avoid
         // the case that getLocalSocketAddress() returns IPv6 address while
         // serviceAddress represents the same address in IPv4.
-        this.localAddress = (InetSocketAddress) service.getLocalAddress();
+        this.localAddress = service.getLocalAddress();
 
         applySettings();
     }
@@ -97,10 +90,9 @@
     /**
      * Creates a new connector instance.
      */
-    DatagramSessionImpl(IoConnector service, DatagramService managerDelegate,
+    DatagramSessionImpl(DatagramConnector service,
             DatagramChannel ch, IoHandler defaultHandler) {
         this.service = service;
-        this.managerDelegate = managerDelegate;
         this.ch = ch;
         this.handler = defaultHandler;
         this.remoteAddress = (InetSocketAddress) ch.socket()
@@ -135,10 +127,6 @@
         return config;
     }
 
-    DatagramService getManagerDelegate() {
-        return managerDelegate;
-    }
-
     public IoFilterChain getFilterChain() {
         return filterChain;
     }
@@ -161,8 +149,8 @@
 
     @Override
     protected void close0() {
-        if (managerDelegate instanceof IoAcceptor) {
-            ((DatagramAcceptorDelegate) managerDelegate).getSessionRecycler()
+        if (service instanceof IoAcceptor) {
+            ((DatagramAcceptor) service).getSessionRecycler()
                     .remove(this);
         }
         filterChain.fireFilterClose(this);
@@ -217,10 +205,6 @@
         return size;
     }
 
-    public IoServiceMetadata getTransportType() {
-        return IoServiceMetadata.DATAGRAM;
-    }
-
     public InetSocketAddress getRemoteAddress() {
         return remoteAddress;
     }
@@ -236,7 +220,9 @@
 
     @Override
     protected void updateTrafficMask() {
-        managerDelegate.updateTrafficMask(this);
+        if (service instanceof DatagramConnector) {
+            ((DatagramConnector) service).updateTrafficMask(this);
+        }
     }
 
     int getReadBufferSize() {

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java Tue Aug 14 06:16:29 2007
@@ -170,7 +170,7 @@
      * @see org.apache.mina.common.IoService#getMetadata()
      */
     public IoServiceMetadata getMetadata() {
-        return IoServiceMetadata.SOCKET;
+        return SocketServiceMetadata.INSTANCE;
     }
 
     /**
@@ -251,13 +251,14 @@
     protected void doBind() throws IOException {
         RegistrationRequest request = new RegistrationRequest();
 
+        // adds the Registration request to the queue for the Workers 
+        // to handle
+        registerQueue.offer(request);
+
         // creates an instance of a Worker and has the local 
         // executor kick it off.
         startupWorker();
 
-        // adds the Registration request to the queue for the Workers 
-        // to handle
-        registerQueue.offer(request);
         selector.wakeup();
 
         synchronized (request) {

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java Tue Aug 14 06:16:29 2007
@@ -124,7 +124,7 @@
     }
 
     public IoServiceMetadata getMetadata() {
-        return IoServiceMetadata.SOCKET;
+        return SocketServiceMetadata.INSTANCE;
     }
 
     @Override

Added: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketServiceMetadata.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketServiceMetadata.java?view=auto&rev=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketServiceMetadata.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketServiceMetadata.java Tue Aug 14 06:16:29 2007
@@ -0,0 +1,37 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.net.InetSocketAddress;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.DefaultIoServiceMetadata;
+
+class SocketServiceMetadata extends DefaultIoServiceMetadata {
+    static final SocketServiceMetadata INSTANCE = new SocketServiceMetadata();
+    
+    private SocketServiceMetadata() {
+        super(
+                "socket", false, true,
+                InetSocketAddress.class,
+                ByteBuffer.class,
+                SocketSessionConfig.class);
+    }
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketServiceMetadata.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketServiceMetadata.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java Tue Aug 14 06:16:29 2007
@@ -35,7 +35,6 @@
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionConfig;
 import org.apache.mina.common.RuntimeIOException;
-import org.apache.mina.common.IoServiceMetadata;
 import org.apache.mina.common.WriteRequest;
 
 /**
@@ -168,10 +167,6 @@
     @Override
     protected void write0(WriteRequest writeRequest) {
         filterChain.fireFilterWrite(this, writeRequest);
-    }
-
-    public IoServiceMetadata getTransportType() {
-        return IoServiceMetadata.SOCKET;
     }
 
     public InetSocketAddress getRemoteAddress() {

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java Tue Aug 14 06:16:29 2007
@@ -48,7 +48,7 @@
     }
 
     public IoServiceMetadata getMetadata() {
-        return IoServiceMetadata.VM_PIPE;
+        return VmPipeServiceMetadata.INSTANCE;
     }
 
     @Override

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java Tue Aug 14 06:16:29 2007
@@ -56,7 +56,7 @@
     }
 
     public IoServiceMetadata getMetadata() {
-        return IoServiceMetadata.VM_PIPE;
+        return VmPipeServiceMetadata.INSTANCE;
     }
 
     @Override

Added: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeServiceMetadata.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeServiceMetadata.java?view=auto&rev=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeServiceMetadata.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeServiceMetadata.java Tue Aug 14 06:16:29 2007
@@ -0,0 +1,34 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.mina.transport.vmpipe;
+
+import org.apache.mina.common.DefaultIoServiceMetadata;
+
+class VmPipeServiceMetadata extends DefaultIoServiceMetadata {
+    static final VmPipeServiceMetadata INSTANCE = new VmPipeServiceMetadata();
+    
+    private VmPipeServiceMetadata() {
+        super(
+                "vm", false, false,
+                VmPipeAddress.class,
+                Object.class,
+                VmPipeSessionConfig.class);
+    }
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeServiceMetadata.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeServiceMetadata.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSession.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSession.java Tue Aug 14 06:16:29 2007
@@ -1,3 +1,22 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
 package org.apache.mina.transport.vmpipe;
 
 import org.apache.mina.common.IoSession;

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Tue Aug 14 06:16:29 2007
@@ -30,7 +30,6 @@
 import org.apache.mina.common.IoService;
 import org.apache.mina.common.IoServiceListenerSupport;
 import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoServiceMetadata;
 import org.apache.mina.common.WriteRequest;
 import org.apache.mina.transport.vmpipe.DefaultVmPipeSessionConfig;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
@@ -139,10 +138,6 @@
 
     public int getScheduledWriteBytes() {
         return 0;
-    }
-
-    public IoServiceMetadata getTransportType() {
-        return IoServiceMetadata.VM_PIPE;
     }
 
     public VmPipeAddress getRemoteAddress() {

Modified: mina/trunk/core/src/test/java/org/apache/mina/common/FutureTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/common/FutureTest.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/common/FutureTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/common/FutureTest.java Tue Aug 14 06:16:29 2007
@@ -24,19 +24,6 @@
 
 import junit.framework.TestCase;
 
-import org.apache.mina.common.AbstractIoSession;
-import org.apache.mina.common.DefaultCloseFuture;
-import org.apache.mina.common.DefaultConnectFuture;
-import org.apache.mina.common.DefaultWriteFuture;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoFuture;
-import org.apache.mina.common.IoFutureListener;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoService;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoSessionConfig;
-import org.apache.mina.common.IoServiceMetadata;
-
 /**
  * Tests {@link IoFuture} implementations.
  * 
@@ -77,10 +64,6 @@
             }
 
             public IoFilterChain getFilterChain() {
-                return null;
-            }
-
-            public IoServiceMetadata getTransportType() {
                 return null;
             }
 

Modified: mina/trunk/core/src/test/java/org/apache/mina/common/IoFilterChainTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/common/IoFilterChainTest.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/common/IoFilterChainTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/common/IoFilterChainTest.java Tue Aug 14 06:16:29 2007
@@ -238,10 +238,6 @@
             return null;
         }
 
-        public IoServiceMetadata getTransportType() {
-            return IoServiceMetadata.VM_PIPE;
-        }
-
         public SocketAddress getRemoteAddress() {
             return null;
         }
@@ -391,10 +387,6 @@
                 }
 
                 public IoFilterChain getFilterChain() {
-                    return null;
-                }
-
-                public IoServiceMetadata getTransportType() {
                     return null;
                 }
 

Modified: mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java Tue Aug 14 06:16:29 2007
@@ -327,5 +327,10 @@
         public IoServiceMetadata getTransportType() {
             return null;
         }
+        
+        @Override
+        public String toString() {
+            return String.valueOf(serviceAddress);
+        }
     }
 }

Modified: mina/trunk/core/src/test/java/org/apache/mina/filter/codec/CumulativeProtocolDecoderTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/filter/codec/CumulativeProtocolDecoderTest.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/filter/codec/CumulativeProtocolDecoderTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/filter/codec/CumulativeProtocolDecoderTest.java Tue Aug 14 06:16:29 2007
@@ -34,7 +34,6 @@
 import org.apache.mina.common.IoService;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionConfig;
-import org.apache.mina.common.IoServiceMetadata;
 
 /**
  * Tests {@link CumulativeProtocolDecoder}.
@@ -181,10 +180,6 @@
         @Override
         public CloseFuture close() {
             return null;
-        }
-
-        public IoServiceMetadata getTransportType() {
-            return IoServiceMetadata.SOCKET;
         }
 
         public SocketAddress getRemoteAddress() {

Modified: mina/trunk/core/src/test/java/org/apache/mina/handler/chain/ChainedIoHandlerTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/handler/chain/ChainedIoHandlerTest.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/handler/chain/ChainedIoHandlerTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/handler/chain/ChainedIoHandlerTest.java Tue Aug 14 06:16:29 2007
@@ -30,7 +30,6 @@
 import org.apache.mina.common.IoService;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionConfig;
-import org.apache.mina.common.IoServiceMetadata;
 
 /**
  * A test case for {@link ChainedIoHandler}.
@@ -64,10 +63,6 @@
             }
 
             public IoFilterChain getFilterChain() {
-                return null;
-            }
-
-            public IoServiceMetadata getTransportType() {
                 return null;
             }
 

Modified: mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java
URL: http://svn.apache.org/viewvc/mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java (original)
+++ mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java Tue Aug 14 06:16:29 2007
@@ -24,8 +24,8 @@
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoServiceMetadata;
 import org.apache.mina.filter.ssl.SSLFilter;
+import org.apache.mina.transport.socket.nio.SocketSession;
 import org.apache.mina.transport.socket.nio.SocketSessionConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,9 +42,9 @@
 
     @Override
     public void sessionCreated(IoSession session) {
-        if (session.getTransportType() == IoServiceMetadata.SOCKET) {
-            ((SocketSessionConfig) session.getConfig())
-                    .setReceiveBufferSize(2048);
+        if (session instanceof SocketSession) {
+            SocketSessionConfig config = ((SocketSession) session).getConfig();
+            config.setReceiveBufferSize(2048);
         }
 
         session.setIdleTime(IdleStatus.BOTH_IDLE, 10);

Modified: mina/trunk/example/src/test/java/org/apache/mina/example/echoserver/ConnectorTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/example/src/test/java/org/apache/mina/example/echoserver/ConnectorTest.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/example/src/test/java/org/apache/mina/example/echoserver/ConnectorTest.java (original)
+++ mina/trunk/example/src/test/java/org/apache/mina/example/echoserver/ConnectorTest.java Tue Aug 14 06:16:29 2007
@@ -28,8 +28,8 @@
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
-import org.apache.mina.common.RuntimeIOException;
 import org.apache.mina.common.IoSessionLogger;
+import org.apache.mina.common.RuntimeIOException;
 import org.apache.mina.common.WriteFuture;
 import org.apache.mina.example.echoserver.ssl.BogusSSLContextFactory;
 import org.apache.mina.filter.ssl.SSLFilter;
@@ -178,7 +178,7 @@
 
             writeFuture = session.write(buf);
 
-            if (session.getTransportType().isConnectionless()) {
+            if (session.getService().getMetadata().isConnectionless()) {
                 // This will align message arrival order in connectionless transport types
                 waitForResponse(handler, (i + 1) * DATA_SIZE);
             }

Modified: mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java (original)
+++ mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java Tue Aug 14 06:16:29 2007
@@ -121,7 +121,7 @@
     }
 
     public IoServiceMetadata getMetadata() {
-        return SerialSession.serialTransportType;
+        return SerialSession.METADATA;
     }
 
     private SerialPort initializePort(String user, CommPortIdentifier portId,

Modified: mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java (original)
+++ mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java Tue Aug 14 06:16:29 2007
@@ -35,13 +35,13 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.DefaultIoServiceMetadata;
 import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IdleStatusChecker;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceMetadata;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionConfig;
-import org.apache.mina.common.IdleStatusChecker;
-import org.apache.mina.common.IoServiceMetadata;
 import org.apache.mina.common.WriteRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,8 +75,9 @@
 
     private final Logger log;
 
-    public static final IoServiceMetadata serialTransportType = new DefaultIoServiceMetadata(
-            "serial communication", false, SerialAddress.class,
+    static final IoServiceMetadata METADATA =
+        new DefaultIoServiceMetadata(
+            "serial", false, true, SerialAddress.class,
             ByteBuffer.class, SerialSessionConfig.class);
 
     SerialSession(IoService service, SerialAddress address, SerialPort port) {
@@ -151,10 +152,6 @@
 
     public IoService getService() {
         return service;
-    }
-
-    public IoServiceMetadata getTransportType() {
-        return serialTransportType;
     }
 
     protected void close0() {