You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2007/01/10 09:13:44 UTC
svn commit: r494760 - in
/mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial:
SerialConnector.java SerialFilterChain.java SerialPortAddress.java
SerialSession.java
Author: jvermillard
Date: Wed Jan 10 00:13:43 2007
New Revision: 494760
URL: http://svn.apache.org/viewvc?view=rev&rev=494760
Log:
first version with 1 read and 1 write thread per serial connection
Modified:
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialFilterChain.java
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialPortAddress.java
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java
Modified: mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java?view=diff&rev=494760&r1=494759&r2=494760
==============================================================================
--- mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java (original)
+++ mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java Wed Jan 10 00:13:43 2007
@@ -5,14 +5,16 @@
import java.util.Enumeration;
import java.util.TooManyListenersException;
-import javax.comm.CommPortIdentifier;
-import javax.comm.PortInUseException;
-import javax.comm.SerialPort;
-import javax.comm.UnsupportedCommOperationException;
+import gnu.io.CommPortIdentifier;
+import gnu.io.PortInUseException;
+import gnu.io.SerialPort;
+import gnu.io.UnsupportedCommOperationException;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoSessionConfig;
import org.apache.mina.common.support.BaseIoConnector;
+import org.apache.mina.common.support.DefaultConnectFuture;
+import org.apache.mina.common.support.IoServiceListenerSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +37,13 @@
protected Class<? extends IoSessionConfig> getSessionConfigType() {
return SerialSessionConfig.class;
}
+
+ @Override
+ protected IoServiceListenerSupport getListeners()
+ {
+ return super.getListeners();
+ }
+
@Override
protected ConnectFuture doConnect(SocketAddress remoteAddress, SocketAddress localAddress) {
@@ -74,28 +83,35 @@
// TODO : receive threshold
// serialPort.enableReceiveThreshold(receiveThreshold); /* bytes */
- // TODO : reveive Timeout serialPort.enableReceiveTimeout(10); /* milliseconds */
-
- // TODO : create serial session
- outputStream = serialPort
- .getOutputStream();
- inputStream = serialPort.getInputStream();
+ // TODO : reveive Timeout serialPort.enableReceiveTimeout(10); /* milliseconds */
serialPort.notifyOnDataAvailable(true);
- serialPort.addEventListener(this);
+ ConnectFuture future = new DefaultConnectFuture();
+ SerialSession session = new SerialSession(this,portAddress,serialPort);
+ session.start();
+ future.setSession( session );
+ return future;
} catch (PortInUseException e) {
- e.printStackTrace();
- } catch (UnsupportedCommOperationException e1) {
- e1.printStackTrace();
+ if(log.isDebugEnabled())
+ log.debug("Port In Use Exception : ",e);
+ return DefaultConnectFuture.newFailedFuture(e);
+ } catch (UnsupportedCommOperationException e) {
+ if(log.isDebugEnabled())
+ log.debug("Comm Exception : ",e);
+ return DefaultConnectFuture.newFailedFuture(e);
} catch (IOException e) {
- e.printStackTrace();
+ if(log.isDebugEnabled())
+ log.debug("IOException : ",e);
+ return DefaultConnectFuture.newFailedFuture(e);
} catch (TooManyListenersException e) {
- e.printStackTrace();
+ if(log.isDebugEnabled())
+ log.debug("TooManyListenersException : ",e);
+ return DefaultConnectFuture.newFailedFuture(e);
}
}
}
}
- return null;
+ return DefaultConnectFuture.newFailedFuture(new RuntimeException("Serial port not found"));
}
-}
+}
\ No newline at end of file
Modified: mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialFilterChain.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialFilterChain.java?view=diff&rev=494760&r1=494759&r2=494760
==============================================================================
--- mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialFilterChain.java (original)
+++ mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialFilterChain.java Wed Jan 10 00:13:43 2007
@@ -1,5 +1,8 @@
package org.apache.mina.transport.serial;
+import java.util.Queue;
+
+import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.AbstractIoFilterChain;
@@ -12,14 +15,27 @@
@Override
protected void doClose(IoSession session) throws Exception {
- // TODO Auto-generated method stub
-
+ ((SerialSession)session).closeSerialPort();
}
@Override
protected void doWrite(IoSession session, WriteRequest writeRequest) throws Exception {
- // TODO Auto-generated method stub
-
+ SerialSession s=(SerialSession)session;
+ Queue<WriteRequest> queue = s.getWriteRequestQueue();
+
+ ( ( ByteBuffer ) writeRequest.getMessage() ).mark();
+ // SocketIoProcessor.doFlush() will reset it after write is finished
+ // because the buffer will be passed with messageSent event.
+ ( ( ByteBuffer ) writeRequest.getMessage() ).mark();
+ synchronized( queue )
+ {
+ queue.offer( writeRequest );
+ if( queue.size() == 1 && session.getTrafficMask().isWritable() )
+ {
+ // Notify serial session worker only when writeRequestQueue was empty.
+ s.notifyWriteWorker();
+ }
+ }
}
}
Modified: mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialPortAddress.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialPortAddress.java?view=diff&rev=494760&r1=494759&r2=494760
==============================================================================
--- mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialPortAddress.java (original)
+++ mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialPortAddress.java Wed Jan 10 00:13:43 2007
@@ -24,7 +24,6 @@
import java.security.InvalidParameterException;
import javax.comm.SerialPort;
-import javax.naming.directory.InvalidAttributesException;
public class SerialPortAddress extends SocketAddress {
@@ -109,13 +108,6 @@
public String toString() {
return "serial("+name+",bauds:"+bauds+",databits:"+dataBits+",stopbits:"+stopBits+",parity:"+parity+",flowcontrol:"+flowControl+")";
}
-
- public static void main(String[] args) {
- SerialPortAddress addy=new SerialPortAddress("/dev/ttyS0",9600,DataBits.DATABITS_8,StopBits.STOP_BITS_1,Parity.PARITY_NONE,FlowControl.FLOWCONTROL_NONE);
- System.err.println("serial : "+addy);
-
- }
-
int getDataBitsForRXTX()
{
Modified: mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java?view=diff&rev=494760&r1=494759&r2=494760
==============================================================================
--- mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java (original)
+++ mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java Wed Jan 10 00:13:43 2007
@@ -1,9 +1,19 @@
package org.apache.mina.transport.serial;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.net.SocketAddress;
import java.util.LinkedList;
import java.util.Queue;
+import java.util.TooManyListenersException;
+import gnu.io.SerialPort;
+import gnu.io.SerialPortEvent;
+import gnu.io.SerialPortEventListener;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoService;
@@ -11,28 +21,46 @@
import org.apache.mina.common.TransportType;
import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.BaseIoSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class SerialSession extends BaseIoSession {
+public class SerialSession extends BaseIoSession implements
+ SerialPortEventListener {
private SerialSessionConfig config;
+
private IoHandler ioHandler;
+
private IoFilterChain filterChain;
+
private IoService service;
+
private SerialPortAddress address;
- private final Queue<WriteRequest> writeRequestQueue;
-
- public SerialSession(IoService service, SerialPortAddress address)
- {
+
+ private final Queue<WriteRequest> writeRequestQueue;
+
+ private InputStream inputStream;
+
+ private OutputStream outputStream;
+
+ private SerialPort port;
+
+ private Logger log;
+
+ SerialSession(IoService service, SerialPortAddress address, SerialPort port) {
this.service = service;
this.ioHandler = service.getHandler();
- this.filterChain = new SerialFilterChain( this );
+ this.filterChain = new SerialFilterChain(this);
this.writeRequestQueue = new LinkedList<WriteRequest>();
+ this.port = port;
+
+ log = LoggerFactory.getLogger(SerialSession.class);
}
-
+
@Override
protected void updateTrafficMask() {
// TODO Auto-generated method stub
-
+
}
public IoSessionConfig getConfig() {
@@ -55,14 +83,27 @@
return address;
}
- public int getScheduledWriteBytes() {
- // TODO Auto-generated method stub
- return 0;
+ Queue<WriteRequest> getWriteRequestQueue() {
+ return writeRequestQueue;
}
public int getScheduledWriteMessages() {
- // TODO Auto-generated method stub
- return 0;
+ synchronized (writeRequestQueue) {
+ return writeRequestQueue.size();
+ }
+ }
+
+ public int getScheduledWriteBytes() {
+ int size = 0;
+ synchronized (writeRequestQueue) {
+ for (Object o : writeRequestQueue) {
+ if (o instanceof ByteBuffer) {
+ size += ((ByteBuffer) o).remaining();
+ }
+ }
+ }
+
+ return size;
}
public IoService getService() {
@@ -71,5 +112,148 @@
public TransportType getTransportType() {
return TransportType.getInstance("SERIAL");
+ }
+
+ protected void close0()
+ {
+ filterChain.fireFilterClose( this );
+ }
+
+ /**
+ * start handling streams
+ *
+ * @throws IOException
+ * @throws TooManyListenersException
+ */
+ void start() throws IOException, TooManyListenersException {
+ inputStream = port.getInputStream();
+ outputStream = port.getOutputStream();
+ ReadWorker w = new ReadWorker();
+ w.start();
+ port.addEventListener(this);
+ ((SerialConnector)getService()).getListeners().fireSessionCreated(this);
+ }
+
+ private Object writeMonitor = new Object();
+
+ private WriteWorker writeWorker;
+
+ private class WriteWorker extends Thread {
+ public void run() {
+ while (isConnected() && !isClosing()) {
+ flushWrites();
+
+ // wait for more data
+ try {
+ writeMonitor.wait();
+ } catch (InterruptedException e) {
+ log.error("InterruptedException", e);
+ }
+ }
+ }
+ }
+
+ private void flushWrites() {
+ for (;;) {
+ WriteRequest req;
+
+ synchronized (writeRequestQueue) {
+ req = (WriteRequest) writeRequestQueue.peek();
+ }
+
+ if (req == null)
+ break;
+
+ ByteBuffer buf = (ByteBuffer) req.getMessage();
+ if (buf.remaining() == 0) {
+ synchronized (writeRequestQueue) {
+ writeRequestQueue.poll();
+ }
+ this.increaseWrittenMessages();
+
+ buf.reset();
+
+ this.getFilterChain().fireMessageSent(this, req);
+ continue;
+ }
+
+ int writtenBytes = buf.remaining();
+ try {
+ outputStream.write(buf.array());
+ this.increaseWrittenBytes(writtenBytes);
+ } catch (IOException e) {
+ this.getFilterChain().fireExceptionCaught(this, e);
+ }
+ }
+ }
+
+ void notifyWriteWorker() {
+ if (writeWorker == null) {
+ writeWorker = new WriteWorker();
+ writeWorker.start();
+ } else {
+ synchronized (writeMonitor) {
+ writeMonitor.notifyAll();
+ }
+ }
+ }
+
+ private Object readReadyMonitor = new Object();
+
+ private class ReadWorker extends Thread {
+ @Override
+ public void run() {
+ while (isConnected() && !isClosing()) {
+ synchronized (readReadyMonitor) {
+ try {
+ readReadyMonitor.wait();
+ } catch (InterruptedException e) {
+ log.error("InterruptedException", e);
+ }
+ int dataSize;
+ try {
+ dataSize = inputStream.available();
+ byte[] data = new byte[dataSize];
+ int readBytes = inputStream.read(data);
+
+ if( readBytes > 0 )
+ {
+ increaseReadBytes( readBytes );
+ // TODO : check if it's the good allocation way
+ ByteBuffer buf = ByteBuffer.allocate( readBytes );
+ buf.put(data,0,readBytes);
+ getFilterChain().fireMessageReceived( SerialSession.this, buf );
+ }
+ } catch (IOException e) {
+ getFilterChain().fireExceptionCaught(
+ SerialSession.this, e);
+ }
+ }
+ }
+ }
+ }
+
+ public void serialEvent(SerialPortEvent evt) {
+ if (evt.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
+ synchronized (readReadyMonitor) {
+ readReadyMonitor.notifyAll();
+ }
+ }
+ }
+
+ public void closeSerialPort() {
+ try {
+ inputStream.close();
+ } catch (IOException e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ try {
+ outputStream.close();
+ } catch (IOException e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+
+ port.close();
+ ((SerialConnector)getService()).getListeners().fireSessionDestroyed(this);
}
}