You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2007/01/10 09:13:44 UTC

svn commit: r494760 - in /mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial: SerialConnector.java SerialFilterChain.java SerialPortAddress.java SerialSession.java

Author: jvermillard
Date: Wed Jan 10 00:13:43 2007
New Revision: 494760

URL: http://svn.apache.org/viewvc?view=rev&rev=494760
Log:
first version with 1 read  and 1 write thread per serial connection

Modified:
    mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java
    mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialFilterChain.java
    mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialPortAddress.java
    mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java

Modified: mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java?view=diff&rev=494760&r1=494759&r2=494760
==============================================================================
--- mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java (original)
+++ mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java Wed Jan 10 00:13:43 2007
@@ -5,14 +5,16 @@
 import java.util.Enumeration;
 import java.util.TooManyListenersException;
 
-import javax.comm.CommPortIdentifier;
-import javax.comm.PortInUseException;
-import javax.comm.SerialPort;
-import javax.comm.UnsupportedCommOperationException;
+import gnu.io.CommPortIdentifier;
+import gnu.io.PortInUseException;
+import gnu.io.SerialPort;
+import gnu.io.UnsupportedCommOperationException;
 
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.IoSessionConfig;
 import org.apache.mina.common.support.BaseIoConnector;
+import org.apache.mina.common.support.DefaultConnectFuture;
+import org.apache.mina.common.support.IoServiceListenerSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,6 +37,13 @@
 	protected Class<? extends IoSessionConfig> getSessionConfigType() {
 		return SerialSessionConfig.class;
 	}
+	
+	@Override	
+    protected IoServiceListenerSupport getListeners()
+    {
+        return super.getListeners();
+    }
+
 
 	@Override
 	protected ConnectFuture doConnect(SocketAddress remoteAddress, SocketAddress localAddress) {
@@ -74,28 +83,35 @@
 	
 						// TODO : receive threshold
 						//	serialPort.enableReceiveThreshold(receiveThreshold);  /* bytes */
-				       // TODO : reveive Timeout serialPort.enableReceiveTimeout(10); /* milliseconds */
-
-						// TODO : create serial session
-						outputStream = serialPort
-								.getOutputStream();
-						inputStream = serialPort.getInputStream();
+				        // TODO : reveive Timeout serialPort.enableReceiveTimeout(10); /* milliseconds */
 
 						serialPort.notifyOnDataAvailable(true);
-						serialPort.addEventListener(this);
 						
+						ConnectFuture future = new DefaultConnectFuture();
+						SerialSession session = new SerialSession(this,portAddress,serialPort);
+						session.start();
+						future.setSession( session );
+		                return future;
 					} catch (PortInUseException e) {
-						e.printStackTrace();
-					} catch (UnsupportedCommOperationException e1) {
-						e1.printStackTrace();
+						if(log.isDebugEnabled())
+							log.debug("Port In Use Exception : ",e);
+						return DefaultConnectFuture.newFailedFuture(e);						
+					} catch (UnsupportedCommOperationException e) {
+						if(log.isDebugEnabled())
+							log.debug("Comm Exception : ",e);
+						return DefaultConnectFuture.newFailedFuture(e);
 					} catch (IOException e) {
-						e.printStackTrace();
+						if(log.isDebugEnabled())
+							log.debug("IOException : ",e);
+						return DefaultConnectFuture.newFailedFuture(e);
 					} catch (TooManyListenersException e) {
-						e.printStackTrace();
+						if(log.isDebugEnabled())
+							log.debug("TooManyListenersException : ",e);
+						return DefaultConnectFuture.newFailedFuture(e);
 					}
 				}
 			}
 		}
-		return null;
+		return DefaultConnectFuture.newFailedFuture(new RuntimeException("Serial port not found"));
 	}
-}
+}
\ No newline at end of file

Modified: mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialFilterChain.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialFilterChain.java?view=diff&rev=494760&r1=494759&r2=494760
==============================================================================
--- mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialFilterChain.java (original)
+++ mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialFilterChain.java Wed Jan 10 00:13:43 2007
@@ -1,5 +1,8 @@
 package org.apache.mina.transport.serial;
 
+import java.util.Queue;
+
+import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.AbstractIoFilterChain;
@@ -12,14 +15,27 @@
 
 	@Override
 	protected void doClose(IoSession session) throws Exception {
-		// TODO Auto-generated method stub
-		
+		((SerialSession)session).closeSerialPort();
 	}
 
 	@Override
 	protected void doWrite(IoSession session, WriteRequest writeRequest) throws Exception {
-		// TODO Auto-generated method stub
-		
+		SerialSession s=(SerialSession)session;
+		Queue<WriteRequest> queue = s.getWriteRequestQueue();
+
+		( ( ByteBuffer ) writeRequest.getMessage() ).mark();
+        // SocketIoProcessor.doFlush() will reset it after write is finished
+        // because the buffer will be passed with messageSent event. 
+        ( ( ByteBuffer ) writeRequest.getMessage() ).mark();
+        synchronized( queue )
+        {
+            queue.offer( writeRequest );
+            if( queue.size() == 1 && session.getTrafficMask().isWritable() )
+            {
+                // Notify serial session worker only when writeRequestQueue was empty.
+                s.notifyWriteWorker();
+            }
+        }
 	}
 
 }

Modified: mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialPortAddress.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialPortAddress.java?view=diff&rev=494760&r1=494759&r2=494760
==============================================================================
--- mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialPortAddress.java (original)
+++ mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialPortAddress.java Wed Jan 10 00:13:43 2007
@@ -24,7 +24,6 @@
 import java.security.InvalidParameterException;
 
 import javax.comm.SerialPort;
-import javax.naming.directory.InvalidAttributesException;
 
 public class SerialPortAddress extends SocketAddress {
 	
@@ -109,13 +108,6 @@
 	public String toString() {
 		return "serial("+name+",bauds:"+bauds+",databits:"+dataBits+",stopbits:"+stopBits+",parity:"+parity+",flowcontrol:"+flowControl+")";
 	}
-	
-	public static void main(String[] args) {
-		SerialPortAddress addy=new SerialPortAddress("/dev/ttyS0",9600,DataBits.DATABITS_8,StopBits.STOP_BITS_1,Parity.PARITY_NONE,FlowControl.FLOWCONTROL_NONE);
-		System.err.println("serial : "+addy);
-	
-	}
-	
 	
 	int getDataBitsForRXTX()
 	{

Modified: mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java?view=diff&rev=494760&r1=494759&r2=494760
==============================================================================
--- mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java (original)
+++ mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java Wed Jan 10 00:13:43 2007
@@ -1,9 +1,19 @@
 package org.apache.mina.transport.serial;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.SocketAddress;
 import java.util.LinkedList;
 import java.util.Queue;
+import java.util.TooManyListenersException;
 
+import gnu.io.SerialPort;
+import gnu.io.SerialPortEvent;
+import gnu.io.SerialPortEventListener;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoService;
@@ -11,28 +21,46 @@
 import org.apache.mina.common.TransportType;
 import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.BaseIoSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class SerialSession extends BaseIoSession {
+public class SerialSession extends BaseIoSession implements
+		SerialPortEventListener {
 
 	private SerialSessionConfig config;
+
 	private IoHandler ioHandler;
+
 	private IoFilterChain filterChain;
+
 	private IoService service;
+
 	private SerialPortAddress address;
-    private final Queue<WriteRequest> writeRequestQueue;
-	
-	public SerialSession(IoService service, SerialPortAddress address) 
-	{
+
+	private final Queue<WriteRequest> writeRequestQueue;
+
+	private InputStream inputStream;
+
+	private OutputStream outputStream;
+
+	private SerialPort port;
+
+	private Logger log;
+
+	SerialSession(IoService service, SerialPortAddress address, SerialPort port) {
 		this.service = service;
 		this.ioHandler = service.getHandler();
-		this.filterChain = new SerialFilterChain( this );
+		this.filterChain = new SerialFilterChain(this);
 		this.writeRequestQueue = new LinkedList<WriteRequest>();
+		this.port = port;
+
+		log = LoggerFactory.getLogger(SerialSession.class);
 	}
-	
+
 	@Override
 	protected void updateTrafficMask() {
 		// TODO Auto-generated method stub
-		
+
 	}
 
 	public IoSessionConfig getConfig() {
@@ -55,14 +83,27 @@
 		return address;
 	}
 
-	public int getScheduledWriteBytes() {
-		// TODO Auto-generated method stub
-		return 0;
+	Queue<WriteRequest> getWriteRequestQueue() {
+		return writeRequestQueue;
 	}
 
 	public int getScheduledWriteMessages() {
-		// TODO Auto-generated method stub
-		return 0;
+		synchronized (writeRequestQueue) {
+			return writeRequestQueue.size();
+		}
+	}
+
+	public int getScheduledWriteBytes() {
+		int size = 0;
+		synchronized (writeRequestQueue) {
+			for (Object o : writeRequestQueue) {
+				if (o instanceof ByteBuffer) {
+					size += ((ByteBuffer) o).remaining();
+				}
+			}
+		}
+
+		return size;
 	}
 
 	public IoService getService() {
@@ -71,5 +112,148 @@
 
 	public TransportType getTransportType() {
 		return TransportType.getInstance("SERIAL");
+	}
+
+	protected void close0()
+    {
+        filterChain.fireFilterClose( this );
+    }
+	
+	/**
+	 * start handling streams
+	 * 
+	 * @throws IOException
+	 * @throws TooManyListenersException
+	 */
+	void start() throws IOException, TooManyListenersException {
+		inputStream = port.getInputStream();
+		outputStream = port.getOutputStream();
+		ReadWorker w = new ReadWorker();
+		w.start();
+		port.addEventListener(this);
+		((SerialConnector)getService()).getListeners().fireSessionCreated(this);
+	}
+
+	private Object writeMonitor = new Object();
+
+	private WriteWorker writeWorker;
+
+	private class WriteWorker extends Thread {
+		public void run() {
+			while (isConnected() && !isClosing()) {
+				flushWrites();
+
+				// wait for more data
+				try {
+					writeMonitor.wait();
+				} catch (InterruptedException e) {
+					log.error("InterruptedException", e);
+				}
+			}
+		}
+	}
+
+	private void flushWrites() {
+		for (;;) {
+			WriteRequest req;
+
+			synchronized (writeRequestQueue) {
+				req = (WriteRequest) writeRequestQueue.peek();
+			}
+
+			if (req == null)
+				break;
+
+			ByteBuffer buf = (ByteBuffer) req.getMessage();
+			if (buf.remaining() == 0) {
+				synchronized (writeRequestQueue) {
+					writeRequestQueue.poll();
+				}
+				this.increaseWrittenMessages();
+
+				buf.reset();
+
+				this.getFilterChain().fireMessageSent(this, req);
+				continue;
+			}
+
+			int writtenBytes = buf.remaining();
+			try {
+				outputStream.write(buf.array());
+				this.increaseWrittenBytes(writtenBytes);
+			} catch (IOException e) {
+				this.getFilterChain().fireExceptionCaught(this, e);
+			}
+		}
+	}
+
+	void notifyWriteWorker() {
+		if (writeWorker == null) {
+			writeWorker = new WriteWorker();
+			writeWorker.start();
+		} else {
+			synchronized (writeMonitor) {
+				writeMonitor.notifyAll();
+			}
+		}
+	}
+
+	private Object readReadyMonitor = new Object();
+
+	private class ReadWorker extends Thread {
+		@Override
+		public void run() {
+			while (isConnected() && !isClosing()) {
+				synchronized (readReadyMonitor) {
+					try {
+						readReadyMonitor.wait();
+					} catch (InterruptedException e) {
+						log.error("InterruptedException", e);
+					}
+					int dataSize;
+					try {
+						dataSize = inputStream.available();
+						byte[] data = new byte[dataSize];
+						int readBytes = inputStream.read(data);
+
+			            if( readBytes > 0 )
+			            {
+			            	increaseReadBytes( readBytes );
+			            	// TODO : check if it's the good allocation way
+			                ByteBuffer buf = ByteBuffer.allocate( readBytes );
+			                buf.put(data,0,readBytes);
+			                getFilterChain().fireMessageReceived( SerialSession.this, buf );
+			            }
+					} catch (IOException e) {
+						getFilterChain().fireExceptionCaught(
+								SerialSession.this, e);
+					}
+				}
+			}
+		}
+	}
+
+	public void serialEvent(SerialPortEvent evt) {
+		if (evt.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
+			synchronized (readReadyMonitor) {
+				readReadyMonitor.notifyAll();
+			}
+		}
+	}
+
+	public void closeSerialPort() {
+		try {
+			inputStream.close();
+		} catch (IOException e) {
+			ExceptionMonitor.getInstance().exceptionCaught(e);
+		}
+		try {
+			outputStream.close();
+		} catch (IOException e) {
+			ExceptionMonitor.getInstance().exceptionCaught(e);
+		}
+		
+		port.close();
+		((SerialConnector)getService()).getListeners().fireSessionDestroyed(this);
 	}
 }