You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/08/14 15:16:31 UTC
svn commit: r565735 - in /mina/trunk:
core/src/main/java/org/apache/mina/common/
core/src/main/java/org/apache/mina/filter/codec/
core/src/main/java/org/apache/mina/filter/codec/demux/
core/src/main/java/org/apache/mina/transport/socket/nio/ core/src/m...
Author: trustin
Date: Tue Aug 14 06:16:29 2007
New Revision: 565735
URL: http://svn.apache.org/viewvc?view=rev&rev=565735
Log:
* Renamed TransportType to IoServiceMetadata
* Added IoServiceMetadata.hasFragmentation()
* Added DefaultIoServiceMetadata
* Removed static fields in IoServiceMetadata which caused bad coupling
* Removed IoSession.getTransportType() because IoServiceMetadata can be retrieved via IoSession.getService().getMetadata()
* Removed transport.socket.nio.support package, and moved the classes there to transport.socket.nio. Datagram transport doesn't use wrapper anymore.
* Fixed freezing SocketBindTest
Added:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramFilterChain.java
- copied, changed from r565678, mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramServiceMetadata.java (with props)
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
- copied, changed from r565678, mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketServiceMetadata.java (with props)
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeServiceMetadata.java (with props)
Removed:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoServiceMetadata.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceMetadata.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java
mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
mina/trunk/core/src/main/java/org/apache/mina/filter/codec/demux/DemuxingProtocolCodecFactory.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSession.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
mina/trunk/core/src/test/java/org/apache/mina/common/FutureTest.java
mina/trunk/core/src/test/java/org/apache/mina/common/IoFilterChainTest.java
mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java
mina/trunk/core/src/test/java/org/apache/mina/filter/codec/CumulativeProtocolDecoderTest.java
mina/trunk/core/src/test/java/org/apache/mina/handler/chain/ChainedIoHandlerTest.java
mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java
mina/trunk/example/src/test/java/org/apache/mina/example/echoserver/ConnectorTest.java
mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java
mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoFilterChain.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoFilterChain.java Tue Aug 14 06:16:29 2007
@@ -567,9 +567,6 @@
@Override
public void filterWrite(NextFilter nextFilter, IoSession session,
WriteRequest writeRequest) throws Exception {
- // I used assertion here because isAssignableFrom takes a lot of CPU.
- assert session.getTransportType().getEnvelopeType()
- .isAssignableFrom(writeRequest.getMessage().getClass());
doWrite(session, writeRequest);
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java Tue Aug 14 06:16:29 2007
@@ -536,8 +536,22 @@
@Override
public String toString() {
- return "(" + getTransportType() + ", R: " + getRemoteAddress()
- + ", L: " + getLocalAddress() + ", S: " + getServiceAddress()
+ return "(" + getServiceName()
+ + ", R: " + getRemoteAddress()
+ + ", L: " + getLocalAddress()
+ + ", S: " + getServiceAddress()
+ ')';
+ }
+
+ private String getServiceName() {
+ if (getService() == null) {
+ return "null";
+ }
+
+ if (getService().getMetadata() == null) {
+ return "null";
+ }
+
+ return getService().getMetadata().getName();
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoServiceMetadata.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoServiceMetadata.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoServiceMetadata.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoServiceMetadata.java Tue Aug 14 06:16:29 2007
@@ -31,16 +31,16 @@
public class DefaultIoServiceMetadata implements IoServiceMetadata {
private final String name;
-
private final boolean connectionless;
-
+ private final boolean fragmentation;
private final Class<? extends SocketAddress> addressType;
-
private final Class<? extends Object> envelopeType;
-
private final Class<? extends IoSessionConfig> sessionConfigType;
- public DefaultIoServiceMetadata(String name, boolean connectionless,
+ public DefaultIoServiceMetadata(
+ String name,
+ boolean connectionless,
+ boolean fragmentation,
Class<? extends SocketAddress> addressType,
Class<? extends Object> envelopeType,
Class<? extends IoSessionConfig> sessionConfigType) {
@@ -62,13 +62,14 @@
if (envelopeType == null) {
throw new NullPointerException("envelopeType");
}
-
+
if (sessionConfigType == null) {
throw new NullPointerException("sessionConfigType");
}
this.name = name;
this.connectionless = connectionless;
+ this.fragmentation = fragmentation;
this.addressType = addressType;
this.envelopeType = envelopeType;
this.sessionConfigType = sessionConfigType;
@@ -92,6 +93,10 @@
public boolean isConnectionless() {
return connectionless;
+ }
+
+ public boolean hasFragmentation() {
+ return fragmentation;
}
@Override
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceMetadata.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceMetadata.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceMetadata.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceMetadata.java Tue Aug 14 06:16:29 2007
@@ -19,16 +19,10 @@
*/
package org.apache.mina.common;
-import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.mina.transport.vmpipe.VmPipeAddress;
-import org.apache.mina.transport.vmpipe.VmPipeSessionConfig;
-
/**
- * Represents a network transport type and its related metadata.
+ * Provides meta-information that describes an {@link IoService}.
*
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev$, $Date$
@@ -36,48 +30,36 @@
public interface IoServiceMetadata {
/**
- * A pre-defined transport type for TCP/IP socket transport.
- */
- static IoServiceMetadata SOCKET = new DefaultIoServiceMetadata("socket", false,
- InetSocketAddress.class, ByteBuffer.class,
- SocketSessionConfig.class);
-
- /**
- * A pre-defined transport type for UDP/IP datagram transport.
- */
- static IoServiceMetadata DATAGRAM = new DefaultIoServiceMetadata("datagram", true,
- InetSocketAddress.class, ByteBuffer.class,
- DatagramSessionConfig.class);
-
- /**
- * A pre-defined transport type for in-VM pipe transport.
- */
- static IoServiceMetadata VM_PIPE = new DefaultIoServiceMetadata("vmpipe", false,
- VmPipeAddress.class, Object.class, VmPipeSessionConfig.class);
-
- /**
- * Returns the name of this transport type.
+ * Returns the name of the service.
*/
String getName();
/**
* Returns <code>true</code> if the session of this transport type is
- * connectionless.
+ * <a href="http://en.wikipedia.org/wiki/Connectionless">connectionless</a>.
*/
boolean isConnectionless();
+
+ /**
+ * Returns {@code true} if the messages exchanged by the service can be
+ * <a href="http://en.wikipedia.org/wiki/IPv4#Fragmentation_and_reassembly">fragmented
+ * or reassembled</a> by its underlying transport.
+ */
+ boolean hasFragmentation();
/**
- * Returns the address type of this transport type.
+ * Returns the address type of the service.
*/
Class<? extends SocketAddress> getAddressType();
/**
- * Returns the type of the envelope message of this transport type.
+ * Returns the allowed message type when you write to an
+ * {@link IoSession} that is managed by the service.
*/
Class<? extends Object> getEnvelopeType();
/**
- * Returns the type of the {@link IoSessionConfig} of this transport type.
+ * Returns the type of the {@link IoSessionConfig} of the service
*/
Class<? extends IoSessionConfig> getSessionConfigType();
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java Tue Aug 14 06:16:29 2007
@@ -209,11 +209,6 @@
Set<String> getAttributeKeys();
/**
- * Returns transport type of this session.
- */
- IoServiceMetadata getTransportType();
-
- /**
* Returns <code>true</code> if this session is connected with remote peer.
*/
boolean isConnected();
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java Tue Aug 14 06:16:29 2007
@@ -218,7 +218,7 @@
throw pee;
} finally {
// Dispose the encoder if this session is connectionless.
- if (session.getTransportType().isConnectionless()) {
+ if (session.getService().getMetadata().isConnectionless()) {
disposeEncoder(session);
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/codec/demux/DemuxingProtocolCodecFactory.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/codec/demux/DemuxingProtocolCodecFactory.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/codec/demux/DemuxingProtocolCodecFactory.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/codec/demux/DemuxingProtocolCodecFactory.java Tue Aug 14 06:16:29 2007
@@ -151,7 +151,7 @@
// Do nothing by default; let users implement it as they want.
// This statement is just to avoid compiler warning. Please ignore.
- session.getTransportType();
+ session.getService();
}
private class ProtocolEncoderImpl implements ProtocolEncoder {
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java Tue Aug 14 06:16:29 2007
@@ -19,14 +19,30 @@
*/
package org.apache.mina.transport.socket.nio;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
-import org.apache.mina.common.IoAcceptorWrapper;
+import org.apache.mina.common.AbstractIoAcceptor;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.ExpiringIoSessionRecycler;
import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoServiceListenerSupport;
+import org.apache.mina.common.IoServiceMetadata;
+import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionRecycler;
-import org.apache.mina.transport.socket.nio.support.DatagramAcceptorDelegate;
+import org.apache.mina.common.RuntimeIOException;
+import org.apache.mina.common.WriteRequest;
+import org.apache.mina.util.NamePreservingRunnable;
import org.apache.mina.util.NewThreadExecutor;
/**
@@ -35,46 +51,179 @@
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev$, $Date$
*/
-public class DatagramAcceptor extends IoAcceptorWrapper {
+public class DatagramAcceptor extends AbstractIoAcceptor implements
+ IoAcceptor {
+ private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringIoSessionRecycler();
+
+ private static volatile int nextId = 0;
+
+ private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
+
+ private final Executor executor;
+
+ private final int id = nextId++;
+
+ private final Selector selector;
+
+ private DatagramChannel channel;
+
+ private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
+
+ private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>();
+
+ private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
+
+ private Worker worker;
+
/**
- * Creates a new instance using a NewThreadExecutor
+ * Creates a new instance.
*/
public DatagramAcceptor() {
- init(new DatagramAcceptorDelegate(this, new NewThreadExecutor()));
+ this(new NewThreadExecutor());
}
/**
* Creates a new instance.
- *
- * @param executor Executor to use for launching threads
*/
public DatagramAcceptor(Executor executor) {
- init(new DatagramAcceptorDelegate(this, executor));
+ super(new DefaultDatagramSessionConfig());
+
+ // The default reuseAddress should be 'true' for an accepted socket.
+ getSessionConfig().setReuseAddress(true);
+
+ try {
+ this.selector = Selector.open();
+ } catch (IOException e) {
+ throw new RuntimeIOException("Failed to open a selector.", e);
+ }
+
+ this.executor = executor;
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ try {
+ selector.close();
+ } catch (IOException e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+
+ public IoServiceMetadata getMetadata() {
+ return DatagramServiceMetadata.INSTANCE;
}
@Override
public DatagramSessionConfig getSessionConfig() {
- return ((DatagramAcceptorDelegate) acceptor).getSessionConfig();
+ return (DatagramSessionConfig) super.getSessionConfig();
}
@Override
public InetSocketAddress getLocalAddress() {
- return ((DatagramAcceptorDelegate) acceptor).getLocalAddress();
+ return (InetSocketAddress) super.getLocalAddress();
}
- // This method is overriden to work around a problem with
- // bean property access mechanism.
+ @Override
+ protected void doBind() throws IOException {
+ RegistrationRequest request = new RegistrationRequest();
+
+ startupWorker();
+ registerQueue.offer(request);
+ selector.wakeup();
+
+ synchronized (request) {
+ while (!request.done) {
+ try {
+ request.wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ if (request.exception != null) {
+ throw (IOException) new IOException("Failed to bind")
+ .initCause(request.exception);
+ } else {
+ setLocalAddress(channel.socket().getLocalSocketAddress());
+ }
+ }
@Override
- public void setLocalAddress(SocketAddress localAddress) {
- super.setLocalAddress(localAddress);
+ protected void doUnbind() {
+ CancellationRequest request = new CancellationRequest();
+
+ startupWorker();
+ cancelQueue.offer(request);
+ selector.wakeup();
+
+ synchronized (request) {
+ while (!request.done) {
+ try {
+ request.wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ if (request.exception != null) {
+ throw new RuntimeException("Failed to unbind", request.exception);
+ }
+ }
+
+ public IoSession newSession(SocketAddress remoteAddress) {
+ if (remoteAddress == null) {
+ throw new NullPointerException("remoteAddress");
+ }
+
+ synchronized (bindLock) {
+ if (!isBound()) {
+ throw new IllegalStateException(
+ "Can't create a session from a unbound service.");
+ }
+
+ return newSessionWithoutLock(remoteAddress);
+ }
+ }
+
+ private IoSession newSessionWithoutLock(SocketAddress remoteAddress) {
+ Selector selector = this.selector;
+ DatagramChannel ch = this.channel;
+ SelectionKey key = ch.keyFor(selector);
+
+ IoSession session;
+ IoSessionRecycler sessionRecycler = getSessionRecycler();
+ synchronized (sessionRecycler) {
+ session = sessionRecycler.recycle(getLocalAddress(), remoteAddress);
+ if (session != null) {
+ return session;
+ }
+
+ // If a new session needs to be created.
+ DatagramSessionImpl datagramSession = new DatagramSessionImpl(
+ this, ch, getHandler(),
+ (InetSocketAddress) remoteAddress);
+ datagramSession.setSelectionKey(key);
+
+ getSessionRecycler().put(datagramSession);
+ session = datagramSession;
+ }
+
+ try {
+ buildFilterChain(session);
+ getListeners().fireSessionCreated(session);
+ } catch (Throwable t) {
+ ExceptionMonitor.getInstance().exceptionCaught(t);
+ }
+
+ return session;
}
/**
* Returns the {@link IoSessionRecycler} for this service.
*/
public IoSessionRecycler getSessionRecycler() {
- return ((DatagramAcceptorDelegate) acceptor).getSessionRecycler();
+ return sessionRecycler;
}
/**
@@ -83,7 +232,299 @@
* @param sessionRecycler <tt>null</tt> to use the default recycler
*/
public void setSessionRecycler(IoSessionRecycler sessionRecycler) {
- ((DatagramAcceptorDelegate) acceptor)
- .setSessionRecycler(sessionRecycler);
+ synchronized (bindLock) {
+ if (isBound()) {
+ throw new IllegalStateException(
+ "sessionRecycler can't be set while the acceptor is bound.");
+ }
+
+ if (sessionRecycler == null) {
+ sessionRecycler = DEFAULT_RECYCLER;
+ }
+ this.sessionRecycler = sessionRecycler;
+ }
+ }
+
+ @Override
+ protected IoServiceListenerSupport getListeners() {
+ return super.getListeners();
+ }
+
+ private void buildFilterChain(IoSession session) throws Exception {
+ this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ }
+
+ private synchronized void startupWorker() {
+ if (worker == null) {
+ worker = new Worker();
+ executor.execute(new NamePreservingRunnable(worker));
+ }
+ }
+
+ void flushSession(DatagramSessionImpl session) {
+ scheduleFlush(session);
+ Selector selector = this.selector;
+ if (selector != null) {
+ selector.wakeup();
+ }
+ }
+
+ private void scheduleFlush(DatagramSessionImpl session) {
+ flushingSessions.offer(session);
+ }
+
+ private class Worker implements Runnable {
+ public void run() {
+ Thread.currentThread().setName("DatagramAcceptor-" + id);
+
+ for (;;) {
+ try {
+ int nKeys = selector.select();
+
+ registerNew();
+
+ if (nKeys > 0) {
+ processReadySessions(selector.selectedKeys());
+ }
+
+ flushSessions();
+ cancelKeys();
+
+ if (selector.keys().isEmpty()) {
+ synchronized (DatagramAcceptor.this) {
+ if (selector.keys().isEmpty()
+ && registerQueue.isEmpty()
+ && cancelQueue.isEmpty()) {
+ worker = null;
+ break;
+ }
+ }
+ }
+ } catch (IOException e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ }
+ }
+ }
+ }
+ }
+
+ private void processReadySessions(Set<SelectionKey> keys) {
+ Iterator<SelectionKey> it = keys.iterator();
+ while (it.hasNext()) {
+ SelectionKey key = it.next();
+ it.remove();
+
+ DatagramChannel ch = (DatagramChannel) key.channel();
+
+ try {
+ if (key.isReadable()) {
+ readSession(ch);
+ }
+
+ if (key.isWritable()) {
+ for (IoSession session : getManagedSessions()) {
+ scheduleFlush((DatagramSessionImpl) session);
+ }
+ }
+ } catch (Throwable t) {
+ ExceptionMonitor.getInstance().exceptionCaught(t);
+ }
+ }
+ }
+
+ private void readSession(DatagramChannel channel) throws Exception {
+ ByteBuffer readBuf = ByteBuffer.allocate(getSessionConfig()
+ .getReceiveBufferSize());
+
+ SocketAddress remoteAddress = channel.receive(readBuf.buf());
+ if (remoteAddress != null) {
+ DatagramSessionImpl session = (DatagramSessionImpl) newSessionWithoutLock(remoteAddress);
+
+ readBuf.flip();
+
+ ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
+ newBuf.put(readBuf);
+ newBuf.flip();
+
+ session.increaseReadBytes(newBuf.remaining());
+ session.getFilterChain().fireMessageReceived(session, newBuf);
+ }
+ }
+
+ private void flushSessions() {
+ for (;;) {
+ DatagramSessionImpl session = flushingSessions.poll();
+ if (session == null) {
+ break;
+ }
+
+ try {
+ flush(session);
+ } catch (IOException e) {
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ }
+ }
+
+ private void flush(DatagramSessionImpl session) throws IOException {
+ DatagramChannel ch = session.getChannel();
+
+ Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
+
+ WriteRequest req;
+ for (;;) {
+ synchronized (writeRequestQueue) {
+ req = writeRequestQueue.peek();
+ }
+
+ if (req == null) {
+ break;
+ }
+
+ ByteBuffer buf = (ByteBuffer) req.getMessage();
+ if (buf.remaining() == 0) {
+ // pop and fire event
+ synchronized (writeRequestQueue) {
+ writeRequestQueue.poll();
+ }
+
+ session.increaseWrittenMessages();
+ buf.reset();
+ ((DatagramFilterChain) session.getFilterChain())
+ .fireMessageSent(session, req);
+ continue;
+ }
+
+ SelectionKey key = session.getSelectionKey();
+ if (key == null) {
+ scheduleFlush(session);
+ break;
+ }
+ if (!key.isValid()) {
+ continue;
+ }
+
+ SocketAddress destination = req.getDestination();
+ if (destination == null) {
+ destination = session.getRemoteAddress();
+ }
+
+ int writtenBytes = ch.send(buf.buf(), destination);
+
+ if (writtenBytes == 0) {
+ // Kernel buffer is full
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ } else if (writtenBytes > 0) {
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+
+ // pop and fire event
+ synchronized (writeRequestQueue) {
+ writeRequestQueue.poll();
+ }
+
+ session.increaseWrittenBytes(writtenBytes);
+ session.increaseWrittenMessages();
+ buf.reset();
+ session.getFilterChain().fireMessageSent(session, req);
+ }
+ }
+ }
+
+ private void registerNew() {
+ if (registerQueue.isEmpty()) {
+ return;
+ }
+
+ for (;;) {
+ RegistrationRequest req = registerQueue.poll();
+ if (req == null) {
+ break;
+ }
+
+ DatagramChannel ch = null;
+ try {
+ ch = DatagramChannel.open();
+ DatagramSessionConfig cfg = getSessionConfig();
+ ch.socket().setReuseAddress(cfg.isReuseAddress());
+ ch.socket().setBroadcast(cfg.isBroadcast());
+ ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
+ ch.socket().setSendBufferSize(cfg.getSendBufferSize());
+
+ if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {
+ ch.socket().setTrafficClass(cfg.getTrafficClass());
+ }
+
+ ch.configureBlocking(false);
+ ch.socket().bind(getLocalAddress());
+ ch.register(selector, SelectionKey.OP_READ, req);
+ this.channel = ch;
+
+ getListeners().fireServiceActivated();
+ } catch (Throwable t) {
+ req.exception = t;
+ } finally {
+ synchronized (req) {
+ req.done = true;
+ req.notify();
+ }
+
+ if (ch != null && req.exception != null) {
+ try {
+ ch.disconnect();
+ ch.close();
+ } catch (Throwable e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+ }
+ }
+
+ private void cancelKeys() {
+ for (;;) {
+ CancellationRequest request = cancelQueue.poll();
+ if (request == null) {
+ break;
+ }
+
+ DatagramChannel ch = this.channel;
+ this.channel = null;
+
+ // close the channel
+ try {
+ SelectionKey key = ch.keyFor(selector);
+ key.cancel();
+ selector.wakeup(); // wake up again to trigger thread death
+ ch.disconnect();
+ ch.close();
+ } catch (Throwable t) {
+ ExceptionMonitor.getInstance().exceptionCaught(t);
+ } finally {
+ synchronized (request) {
+ request.done = true;
+ request.notify();
+ }
+
+ if (request.exception == null) {
+ getListeners().fireServiceDeactivated();
+ }
+ }
+ }
+ }
+
+ private static class RegistrationRequest {
+ private Throwable exception;
+
+ private boolean done;
+ }
+
+ private static class CancellationRequest {
+ private boolean done;
+
+ private RuntimeException exception;
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java Tue Aug 14 06:16:29 2007
@@ -19,11 +19,29 @@
*/
package org.apache.mina.transport.socket.nio;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
-import org.apache.mina.common.IoConnectorWrapper;
+import org.apache.mina.common.AbstractIoConnector;
+import org.apache.mina.common.AbstractIoFilterChain;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.DefaultConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoConnector;
-import org.apache.mina.transport.socket.nio.support.DatagramConnectorDelegate;
+import org.apache.mina.common.IoServiceMetadata;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.RuntimeIOException;
+import org.apache.mina.common.WriteRequest;
+import org.apache.mina.util.NamePreservingRunnable;
import org.apache.mina.util.NewThreadExecutor;
/**
@@ -32,25 +50,409 @@
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev$, $Date$
*/
-public class DatagramConnector extends IoConnectorWrapper {
+public class DatagramConnector extends AbstractIoConnector {
+ private static volatile int nextId = 0;
+
+ private final Executor executor;
+
+ private final int id = nextId++;
+
+ private final Selector selector;
+
+ private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
+
+ private final Queue<DatagramSessionImpl> cancelQueue = new ConcurrentLinkedQueue<DatagramSessionImpl>();
+
+ private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
+
+ private final Queue<DatagramSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
+
+ private Worker worker;
+
/**
- * Creates a new instance using a NewThreadExecutor
+ * Creates a new instance.
*/
public DatagramConnector() {
- init(new DatagramConnectorDelegate(this, new NewThreadExecutor()));
+ this(new NewThreadExecutor());
}
/**
* Creates a new instance.
- *
- * @param executor Executor to use for launching threads
*/
public DatagramConnector(Executor executor) {
- init(new DatagramConnectorDelegate(this, executor));
+ super(new DefaultDatagramSessionConfig());
+
+ try {
+ this.selector = Selector.open();
+ } catch (IOException e) {
+ throw new RuntimeIOException("Failed to open a selector.", e);
+ }
+
+ this.executor = executor;
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ try {
+ selector.close();
+ } catch (IOException e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+
+ public IoServiceMetadata getMetadata() {
+ return DatagramServiceMetadata.INSTANCE;
}
@Override
public DatagramSessionConfig getSessionConfig() {
- return ((DatagramConnectorDelegate) connector).getSessionConfig();
+ return (DatagramSessionConfig) super.getSessionConfig();
+ }
+
+ @Override
+ protected ConnectFuture doConnect(SocketAddress remoteAddress,
+ SocketAddress localAddress) {
+ DatagramChannel ch = null;
+ boolean initialized = false;
+ try {
+ ch = DatagramChannel.open();
+ DatagramSessionConfig cfg = getSessionConfig();
+
+ ch.socket().setReuseAddress(cfg.isReuseAddress());
+ ch.socket().setBroadcast(cfg.isBroadcast());
+ ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
+ ch.socket().setSendBufferSize(cfg.getSendBufferSize());
+
+ if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {
+ ch.socket().setTrafficClass(cfg.getTrafficClass());
+ }
+
+ if (localAddress != null) {
+ ch.socket().bind(localAddress);
+ }
+ ch.connect(remoteAddress);
+ ch.configureBlocking(false);
+ initialized = true;
+ } catch (IOException e) {
+ return DefaultConnectFuture.newFailedFuture(e);
+ } finally {
+ if (!initialized && ch != null) {
+ try {
+ ch.disconnect();
+ ch.close();
+ } catch (IOException e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+
+ RegistrationRequest request = new RegistrationRequest(ch);
+
+ startupWorker();
+ registerQueue.offer(request);
+ selector.wakeup();
+
+ return request;
+ }
+
+ private synchronized void startupWorker() {
+ if (worker == null) {
+ worker = new Worker();
+ executor.execute(new NamePreservingRunnable(worker));
+ }
+ }
+
+ void closeSession(DatagramSessionImpl session) {
+ startupWorker();
+ cancelQueue.offer(session);
+ selector.wakeup();
+ }
+
+ void flushSession(DatagramSessionImpl session) {
+ scheduleFlush(session);
+ Selector selector = this.selector;
+ if (selector != null) {
+ selector.wakeup();
+ }
+ }
+
+ private void scheduleFlush(DatagramSessionImpl session) {
+ flushingSessions.offer(session);
+ }
+
+ void updateTrafficMask(DatagramSessionImpl session) {
+ scheduleTrafficControl(session);
+ Selector selector = this.selector;
+ if (selector != null) {
+ selector.wakeup();
+ }
+ selector.wakeup();
+ }
+
+ private void scheduleTrafficControl(DatagramSessionImpl session) {
+ trafficControllingSessions.offer(session);
+ }
+
+ private void doUpdateTrafficMask() {
+ for (;;) {
+ DatagramSessionImpl session = trafficControllingSessions.poll();
+ if (session == null) {
+ break;
+ }
+
+ SelectionKey key = session.getSelectionKey();
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.suspend??() or session.resume??() is
+ // called before addSession() is processed)
+ if (key == null) {
+ scheduleTrafficControl(session);
+ break;
+ }
+ // skip if channel is already closed
+ if (!key.isValid()) {
+ continue;
+ }
+
+ // The normal is OP_READ and, if there are write requests in the
+ // session's write queue, set OP_WRITE to trigger flushing.
+ int ops = SelectionKey.OP_READ;
+ Queue<WriteRequest> writeRequestQueue = session
+ .getWriteRequestQueue();
+ synchronized (writeRequestQueue) {
+ if (!writeRequestQueue.isEmpty()) {
+ ops |= SelectionKey.OP_WRITE;
+ }
+ }
+
+ // Now mask the preferred ops with the mask of the current session
+ int mask = session.getTrafficMask().getInterestOps();
+ key.interestOps(ops & mask);
+ }
+ }
+
+ private class Worker implements Runnable {
+ public void run() {
+ Thread.currentThread().setName("DatagramConnector-" + id);
+
+ for (;;) {
+ try {
+ int nKeys = selector.select();
+
+ registerNew();
+ doUpdateTrafficMask();
+
+ if (nKeys > 0) {
+ processReadySessions(selector.selectedKeys());
+ }
+
+ flushSessions();
+ cancelKeys();
+
+ if (selector.keys().isEmpty()) {
+ synchronized (DatagramConnector.this) {
+ if (selector.keys().isEmpty()
+ && registerQueue.isEmpty()
+ && cancelQueue.isEmpty()) {
+ worker = null;
+ break;
+ }
+ }
+ }
+ } catch (IOException e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ }
+ }
+ }
+ }
+ }
+
+ private void processReadySessions(Set<SelectionKey> keys) {
+ Iterator<SelectionKey> it = keys.iterator();
+ while (it.hasNext()) {
+ SelectionKey key = it.next();
+ it.remove();
+
+ DatagramSessionImpl session = (DatagramSessionImpl) key
+ .attachment();
+
+ if (key.isReadable() && session.getTrafficMask().isReadable()) {
+ readSession(session);
+ }
+
+ if (key.isWritable() && session.getTrafficMask().isWritable()) {
+ scheduleFlush(session);
+ }
+ }
+ }
+
+ private void readSession(DatagramSessionImpl session) {
+
+ ByteBuffer readBuf = ByteBuffer.allocate(session.getReadBufferSize());
+ try {
+ int readBytes = session.getChannel().read(readBuf.buf());
+ if (readBytes > 0) {
+ readBuf.flip();
+ ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
+ newBuf.put(readBuf);
+ newBuf.flip();
+
+ session.increaseReadBytes(readBytes);
+ session.getFilterChain().fireMessageReceived(session, newBuf);
+ }
+ } catch (IOException e) {
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ }
+
+ private void flushSessions() {
+ for (;;) {
+ DatagramSessionImpl session = flushingSessions.poll();
+ if (session == null) {
+ break;
+ }
+
+ try {
+ flush(session);
+ } catch (IOException e) {
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ }
+ }
+
+ private void flush(DatagramSessionImpl session) throws IOException {
+ DatagramChannel ch = session.getChannel();
+
+ Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
+
+ WriteRequest req;
+ for (;;) {
+ synchronized (writeRequestQueue) {
+ req = writeRequestQueue.peek();
+ }
+
+ if (req == null) {
+ break;
+ }
+
+ ByteBuffer buf = (ByteBuffer) req.getMessage();
+ if (buf.remaining() == 0) {
+ // pop and fire event
+ synchronized (writeRequestQueue) {
+ writeRequestQueue.poll();
+ }
+
+ session.increaseWrittenMessages();
+ buf.reset();
+ session.getFilterChain().fireMessageSent(session, req);
+ continue;
+ }
+
+ SelectionKey key = session.getSelectionKey();
+ if (key == null) {
+ scheduleFlush(session);
+ break;
+ }
+ if (!key.isValid()) {
+ continue;
+ }
+
+ int writtenBytes = ch.write(buf.buf());
+
+ if (writtenBytes == 0) {
+ // Kernel buffer is full
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ } else if (writtenBytes > 0) {
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+
+ // pop and fire event
+ synchronized (writeRequestQueue) {
+ writeRequestQueue.poll();
+ }
+
+ session.increaseWrittenBytes(writtenBytes);
+ session.increaseWrittenMessages();
+ buf.reset();
+ session.getFilterChain().fireMessageSent(session, req);
+ }
+ }
+ }
+
+ private void registerNew() {
+ for (;;) {
+ RegistrationRequest req = registerQueue.poll();
+ if (req == null) {
+ break;
+ }
+
+ DatagramSessionImpl session = new DatagramSessionImpl(
+ this, req.channel, getHandler());
+
+ // AbstractIoFilterChain will notify the connect future.
+ session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, req);
+
+ boolean success = false;
+ try {
+ SelectionKey key = req.channel.register(selector,
+ SelectionKey.OP_READ, session);
+
+ session.setSelectionKey(key);
+ buildFilterChain(session);
+ // The CONNECT_FUTURE attribute is cleared and notified here.
+ getListeners().fireSessionCreated(session);
+ success = true;
+ } catch (Throwable t) {
+ // The CONNECT_FUTURE attribute is cleared and notified here.
+ session.getFilterChain().fireExceptionCaught(session, t);
+ } finally {
+ if (!success) {
+ try {
+ req.channel.disconnect();
+ req.channel.close();
+ } catch (IOException e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+ }
+ }
+
+ private void buildFilterChain(IoSession session) throws Exception {
+ getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ }
+
+ private void cancelKeys() {
+ for (;;) {
+ DatagramSessionImpl session = cancelQueue.poll();
+ if (session == null) {
+ break;
+ } else {
+ SelectionKey key = session.getSelectionKey();
+ DatagramChannel ch = (DatagramChannel) key.channel();
+ try {
+ ch.disconnect();
+ ch.close();
+ } catch (IOException e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+
+ getListeners().fireSessionDestroyed(session);
+ session.getCloseFuture().setClosed();
+ key.cancel();
+ selector.wakeup(); // wake up again to trigger thread death
+ }
+ }
+ }
+
+ private static class RegistrationRequest extends DefaultConnectFuture {
+ private final DatagramChannel channel;
+
+ private RegistrationRequest(DatagramChannel channel) {
+ this.channel = channel;
+ }
}
}
Copied: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramFilterChain.java (from r565678, mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java)
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramFilterChain.java?view=diff&rev=565735&p1=mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java&r1=565678&p2=mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramFilterChain.java&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramFilterChain.java Tue Aug 14 06:16:29 2007
@@ -17,13 +17,14 @@
* under the License.
*
*/
-package org.apache.mina.transport.socket.nio.support;
+package org.apache.mina.transport.socket.nio;
import java.util.Queue;
import org.apache.mina.common.AbstractIoFilterChain;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoService;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.WriteRequest;
@@ -42,16 +43,21 @@
protected void doWrite(IoSession session, WriteRequest writeRequest) {
DatagramSessionImpl s = (DatagramSessionImpl) session;
Queue<WriteRequest> writeRequestQueue = s.getWriteRequestQueue();
-
- // SocketIoProcessor.doFlush() will reset it after write is finished
- // because the buffer will be passed with messageSent event.
((ByteBuffer) writeRequest.getMessage()).mark();
+
+ int writeRequestQueueSize;
synchronized (writeRequestQueue) {
writeRequestQueue.offer(writeRequest);
- if (writeRequestQueue.size() == 1
- && session.getTrafficMask().isWritable()) {
- // Notify DatagramService only when writeRequestQueue was empty.
- s.getManagerDelegate().flushSession(s);
+ writeRequestQueueSize = writeRequestQueue.size();
+ }
+
+ if (writeRequestQueueSize == 1 && session.getTrafficMask().isWritable()) {
+ // Notify SocketIoProcessor only when writeRequestQueue was empty.
+ IoService service = s.getService();
+ if (service instanceof DatagramAcceptor) {
+ ((DatagramAcceptor) service).flushSession(s);
+ } else {
+ ((DatagramConnector) service).flushSession(s);
}
}
}
@@ -59,11 +65,11 @@
@Override
protected void doClose(IoSession session) {
DatagramSessionImpl s = (DatagramSessionImpl) session;
- DatagramService manager = s.getManagerDelegate();
- if (manager instanceof DatagramConnectorDelegate) {
- ((DatagramConnectorDelegate) manager).closeSession(s);
+ IoService service = s.getService();
+ if (service instanceof DatagramConnector) {
+ ((DatagramConnector) service).closeSession(s);
} else {
- ((DatagramAcceptorDelegate) manager).getListeners()
+ ((DatagramAcceptor) service).getListeners()
.fireSessionDestroyed(session);
session.getCloseFuture().setClosed();
}
Added: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramServiceMetadata.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramServiceMetadata.java?view=auto&rev=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramServiceMetadata.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramServiceMetadata.java Tue Aug 14 06:16:29 2007
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.net.InetSocketAddress;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.DefaultIoServiceMetadata;
+
+class DatagramServiceMetadata extends DefaultIoServiceMetadata {
+ static final DatagramServiceMetadata INSTANCE = new DatagramServiceMetadata();
+
+ private DatagramServiceMetadata() {
+ super(
+ "datagram", true, false,
+ InetSocketAddress.class,
+ ByteBuffer.class,
+ DatagramSessionConfig.class);
+ }
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramServiceMetadata.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramServiceMetadata.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java (from r565678, mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java)
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java?view=diff&rev=565735&p1=mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java&r1=565678&p2=mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java Tue Aug 14 06:16:29 2007
@@ -17,7 +17,7 @@
* under the License.
*
*/
-package org.apache.mina.transport.socket.nio.support;
+package org.apache.mina.transport.socket.nio;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -30,19 +30,14 @@
import org.apache.mina.common.AbstractIoSession;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoService;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionConfig;
import org.apache.mina.common.RuntimeIOException;
-import org.apache.mina.common.IoServiceMetadata;
import org.apache.mina.common.WriteFuture;
import org.apache.mina.common.WriteRequest;
-import org.apache.mina.transport.socket.nio.DatagramSession;
-import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
-import org.apache.mina.transport.socket.nio.DefaultDatagramSessionConfig;
/**
* An {@link IoSession} for datagram transport (UDP/IP).
@@ -55,8 +50,6 @@
private final DatagramSessionConfig config = new SessionConfigImpl();
- private final DatagramService managerDelegate;
-
private final DatagramFilterChain filterChain = new DatagramFilterChain(
this);
@@ -77,11 +70,11 @@
/**
* Creates a new acceptor instance.
*/
- DatagramSessionImpl(IoAcceptor service, DatagramService managerDelegate,
+ DatagramSessionImpl(
+ DatagramAcceptor service,
DatagramChannel ch, IoHandler defaultHandler,
InetSocketAddress remoteAddress) {
this.service = service;
- this.managerDelegate = managerDelegate;
this.ch = ch;
this.handler = defaultHandler;
this.remoteAddress = remoteAddress;
@@ -89,7 +82,7 @@
// We didn't set the localAddress by calling getLocalSocketAddress() to avoid
// the case that getLocalSocketAddress() returns IPv6 address while
// serviceAddress represents the same address in IPv4.
- this.localAddress = (InetSocketAddress) service.getLocalAddress();
+ this.localAddress = service.getLocalAddress();
applySettings();
}
@@ -97,10 +90,9 @@
/**
* Creates a new connector instance.
*/
- DatagramSessionImpl(IoConnector service, DatagramService managerDelegate,
+ DatagramSessionImpl(DatagramConnector service,
DatagramChannel ch, IoHandler defaultHandler) {
this.service = service;
- this.managerDelegate = managerDelegate;
this.ch = ch;
this.handler = defaultHandler;
this.remoteAddress = (InetSocketAddress) ch.socket()
@@ -135,10 +127,6 @@
return config;
}
- DatagramService getManagerDelegate() {
- return managerDelegate;
- }
-
public IoFilterChain getFilterChain() {
return filterChain;
}
@@ -161,8 +149,8 @@
@Override
protected void close0() {
- if (managerDelegate instanceof IoAcceptor) {
- ((DatagramAcceptorDelegate) managerDelegate).getSessionRecycler()
+ if (service instanceof IoAcceptor) {
+ ((DatagramAcceptor) service).getSessionRecycler()
.remove(this);
}
filterChain.fireFilterClose(this);
@@ -217,10 +205,6 @@
return size;
}
- public IoServiceMetadata getTransportType() {
- return IoServiceMetadata.DATAGRAM;
- }
-
public InetSocketAddress getRemoteAddress() {
return remoteAddress;
}
@@ -236,7 +220,9 @@
@Override
protected void updateTrafficMask() {
- managerDelegate.updateTrafficMask(this);
+ if (service instanceof DatagramConnector) {
+ ((DatagramConnector) service).updateTrafficMask(this);
+ }
}
int getReadBufferSize() {
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java Tue Aug 14 06:16:29 2007
@@ -170,7 +170,7 @@
* @see org.apache.mina.common.IoService#getMetadata()
*/
public IoServiceMetadata getMetadata() {
- return IoServiceMetadata.SOCKET;
+ return SocketServiceMetadata.INSTANCE;
}
/**
@@ -251,13 +251,14 @@
protected void doBind() throws IOException {
RegistrationRequest request = new RegistrationRequest();
+ // adds the Registration request to the queue for the Workers
+ // to handle
+ registerQueue.offer(request);
+
// creates an instance of a Worker and has the local
// executor kick it off.
startupWorker();
- // adds the Registration request to the queue for the Workers
- // to handle
- registerQueue.offer(request);
selector.wakeup();
synchronized (request) {
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java Tue Aug 14 06:16:29 2007
@@ -124,7 +124,7 @@
}
public IoServiceMetadata getMetadata() {
- return IoServiceMetadata.SOCKET;
+ return SocketServiceMetadata.INSTANCE;
}
@Override
Added: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketServiceMetadata.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketServiceMetadata.java?view=auto&rev=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketServiceMetadata.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketServiceMetadata.java Tue Aug 14 06:16:29 2007
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.net.InetSocketAddress;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.DefaultIoServiceMetadata;
+
+class SocketServiceMetadata extends DefaultIoServiceMetadata {
+ static final SocketServiceMetadata INSTANCE = new SocketServiceMetadata();
+
+ private SocketServiceMetadata() {
+ super(
+ "socket", false, true,
+ InetSocketAddress.class,
+ ByteBuffer.class,
+ SocketSessionConfig.class);
+ }
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketServiceMetadata.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketServiceMetadata.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java Tue Aug 14 06:16:29 2007
@@ -35,7 +35,6 @@
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionConfig;
import org.apache.mina.common.RuntimeIOException;
-import org.apache.mina.common.IoServiceMetadata;
import org.apache.mina.common.WriteRequest;
/**
@@ -168,10 +167,6 @@
@Override
protected void write0(WriteRequest writeRequest) {
filterChain.fireFilterWrite(this, writeRequest);
- }
-
- public IoServiceMetadata getTransportType() {
- return IoServiceMetadata.SOCKET;
}
public InetSocketAddress getRemoteAddress() {
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java Tue Aug 14 06:16:29 2007
@@ -48,7 +48,7 @@
}
public IoServiceMetadata getMetadata() {
- return IoServiceMetadata.VM_PIPE;
+ return VmPipeServiceMetadata.INSTANCE;
}
@Override
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java Tue Aug 14 06:16:29 2007
@@ -56,7 +56,7 @@
}
public IoServiceMetadata getMetadata() {
- return IoServiceMetadata.VM_PIPE;
+ return VmPipeServiceMetadata.INSTANCE;
}
@Override
Added: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeServiceMetadata.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeServiceMetadata.java?view=auto&rev=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeServiceMetadata.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeServiceMetadata.java Tue Aug 14 06:16:29 2007
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.vmpipe;
+
+import org.apache.mina.common.DefaultIoServiceMetadata;
+
+class VmPipeServiceMetadata extends DefaultIoServiceMetadata {
+ static final VmPipeServiceMetadata INSTANCE = new VmPipeServiceMetadata();
+
+ private VmPipeServiceMetadata() {
+ super(
+ "vm", false, false,
+ VmPipeAddress.class,
+ Object.class,
+ VmPipeSessionConfig.class);
+ }
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeServiceMetadata.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeServiceMetadata.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSession.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSession.java Tue Aug 14 06:16:29 2007
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.mina.transport.vmpipe;
import org.apache.mina.common.IoSession;
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Tue Aug 14 06:16:29 2007
@@ -30,7 +30,6 @@
import org.apache.mina.common.IoService;
import org.apache.mina.common.IoServiceListenerSupport;
import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoServiceMetadata;
import org.apache.mina.common.WriteRequest;
import org.apache.mina.transport.vmpipe.DefaultVmPipeSessionConfig;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
@@ -139,10 +138,6 @@
public int getScheduledWriteBytes() {
return 0;
- }
-
- public IoServiceMetadata getTransportType() {
- return IoServiceMetadata.VM_PIPE;
}
public VmPipeAddress getRemoteAddress() {
Modified: mina/trunk/core/src/test/java/org/apache/mina/common/FutureTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/common/FutureTest.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/common/FutureTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/common/FutureTest.java Tue Aug 14 06:16:29 2007
@@ -24,19 +24,6 @@
import junit.framework.TestCase;
-import org.apache.mina.common.AbstractIoSession;
-import org.apache.mina.common.DefaultCloseFuture;
-import org.apache.mina.common.DefaultConnectFuture;
-import org.apache.mina.common.DefaultWriteFuture;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoFuture;
-import org.apache.mina.common.IoFutureListener;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoService;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoSessionConfig;
-import org.apache.mina.common.IoServiceMetadata;
-
/**
* Tests {@link IoFuture} implementations.
*
@@ -77,10 +64,6 @@
}
public IoFilterChain getFilterChain() {
- return null;
- }
-
- public IoServiceMetadata getTransportType() {
return null;
}
Modified: mina/trunk/core/src/test/java/org/apache/mina/common/IoFilterChainTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/common/IoFilterChainTest.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/common/IoFilterChainTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/common/IoFilterChainTest.java Tue Aug 14 06:16:29 2007
@@ -238,10 +238,6 @@
return null;
}
- public IoServiceMetadata getTransportType() {
- return IoServiceMetadata.VM_PIPE;
- }
-
public SocketAddress getRemoteAddress() {
return null;
}
@@ -391,10 +387,6 @@
}
public IoFilterChain getFilterChain() {
- return null;
- }
-
- public IoServiceMetadata getTransportType() {
return null;
}
Modified: mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java Tue Aug 14 06:16:29 2007
@@ -327,5 +327,10 @@
public IoServiceMetadata getTransportType() {
return null;
}
+
+ @Override
+ public String toString() {
+ return String.valueOf(serviceAddress);
+ }
}
}
Modified: mina/trunk/core/src/test/java/org/apache/mina/filter/codec/CumulativeProtocolDecoderTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/filter/codec/CumulativeProtocolDecoderTest.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/filter/codec/CumulativeProtocolDecoderTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/filter/codec/CumulativeProtocolDecoderTest.java Tue Aug 14 06:16:29 2007
@@ -34,7 +34,6 @@
import org.apache.mina.common.IoService;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionConfig;
-import org.apache.mina.common.IoServiceMetadata;
/**
* Tests {@link CumulativeProtocolDecoder}.
@@ -181,10 +180,6 @@
@Override
public CloseFuture close() {
return null;
- }
-
- public IoServiceMetadata getTransportType() {
- return IoServiceMetadata.SOCKET;
}
public SocketAddress getRemoteAddress() {
Modified: mina/trunk/core/src/test/java/org/apache/mina/handler/chain/ChainedIoHandlerTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/handler/chain/ChainedIoHandlerTest.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/handler/chain/ChainedIoHandlerTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/handler/chain/ChainedIoHandlerTest.java Tue Aug 14 06:16:29 2007
@@ -30,7 +30,6 @@
import org.apache.mina.common.IoService;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionConfig;
-import org.apache.mina.common.IoServiceMetadata;
/**
* A test case for {@link ChainedIoHandler}.
@@ -64,10 +63,6 @@
}
public IoFilterChain getFilterChain() {
- return null;
- }
-
- public IoServiceMetadata getTransportType() {
return null;
}
Modified: mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java
URL: http://svn.apache.org/viewvc/mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java (original)
+++ mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java Tue Aug 14 06:16:29 2007
@@ -24,8 +24,8 @@
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoServiceMetadata;
import org.apache.mina.filter.ssl.SSLFilter;
+import org.apache.mina.transport.socket.nio.SocketSession;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,9 +42,9 @@
@Override
public void sessionCreated(IoSession session) {
- if (session.getTransportType() == IoServiceMetadata.SOCKET) {
- ((SocketSessionConfig) session.getConfig())
- .setReceiveBufferSize(2048);
+ if (session instanceof SocketSession) {
+ SocketSessionConfig config = ((SocketSession) session).getConfig();
+ config.setReceiveBufferSize(2048);
}
session.setIdleTime(IdleStatus.BOTH_IDLE, 10);
Modified: mina/trunk/example/src/test/java/org/apache/mina/example/echoserver/ConnectorTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/example/src/test/java/org/apache/mina/example/echoserver/ConnectorTest.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/example/src/test/java/org/apache/mina/example/echoserver/ConnectorTest.java (original)
+++ mina/trunk/example/src/test/java/org/apache/mina/example/echoserver/ConnectorTest.java Tue Aug 14 06:16:29 2007
@@ -28,8 +28,8 @@
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
-import org.apache.mina.common.RuntimeIOException;
import org.apache.mina.common.IoSessionLogger;
+import org.apache.mina.common.RuntimeIOException;
import org.apache.mina.common.WriteFuture;
import org.apache.mina.example.echoserver.ssl.BogusSSLContextFactory;
import org.apache.mina.filter.ssl.SSLFilter;
@@ -178,7 +178,7 @@
writeFuture = session.write(buf);
- if (session.getTransportType().isConnectionless()) {
+ if (session.getService().getMetadata().isConnectionless()) {
// This will align message arrival order in connectionless transport types
waitForResponse(handler, (i + 1) * DATA_SIZE);
}
Modified: mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java (original)
+++ mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java Tue Aug 14 06:16:29 2007
@@ -121,7 +121,7 @@
}
public IoServiceMetadata getMetadata() {
- return SerialSession.serialTransportType;
+ return SerialSession.METADATA;
}
private SerialPort initializePort(String user, CommPortIdentifier portId,
Modified: mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java?view=diff&rev=565735&r1=565734&r2=565735
==============================================================================
--- mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java (original)
+++ mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java Tue Aug 14 06:16:29 2007
@@ -35,13 +35,13 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.DefaultIoServiceMetadata;
import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IdleStatusChecker;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceMetadata;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionConfig;
-import org.apache.mina.common.IdleStatusChecker;
-import org.apache.mina.common.IoServiceMetadata;
import org.apache.mina.common.WriteRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,8 +75,9 @@
private final Logger log;
- public static final IoServiceMetadata serialTransportType = new DefaultIoServiceMetadata(
- "serial communication", false, SerialAddress.class,
+ static final IoServiceMetadata METADATA =
+ new DefaultIoServiceMetadata(
+ "serial", false, true, SerialAddress.class,
ByteBuffer.class, SerialSessionConfig.class);
SerialSession(IoService service, SerialAddress address, SerialPort port) {
@@ -151,10 +152,6 @@
public IoService getService() {
return service;
- }
-
- public IoServiceMetadata getTransportType() {
- return serialTransportType;
}
protected void close0() {