You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2007/04/06 16:44:59 UTC
svn commit: r526184 - in
/mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial:
DefaultSerialSessionConfig.java SerialAddress.java SerialConnector.java
SerialFilterChain.java SerialSession.java SerialSessionConfig.java
Author: jvermillard
Date: Fri Apr 6 07:44:58 2007
New Revision: 526184
URL: http://svn.apache.org/viewvc?view=rev&rev=526184
Log:
Serial communications :
- managing IDLEness
- more session parameters
- fixed code style
- start of javadoc
Modified:
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/DefaultSerialSessionConfig.java
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialAddress.java
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialFilterChain.java
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java
mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSessionConfig.java
Modified: mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/DefaultSerialSessionConfig.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/DefaultSerialSessionConfig.java?view=diff&rev=526184&r1=526183&r2=526184
==============================================================================
--- mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/DefaultSerialSessionConfig.java (original)
+++ mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/DefaultSerialSessionConfig.java Fri Apr 6 07:44:58 2007
@@ -1,36 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.mina.transport.serial;
-public class DefaultSerialSessionConfig implements SerialSessionConfig
-{
-
- private int inputBufferSize=8;
-
- private boolean lowLatency=false;
-
- public DefaultSerialSessionConfig()
- {
-
- }
-
- @Override
- public Object clone()
- {
- return new DefaultSerialSessionConfig();
- }
-
- public int getInputBufferSize() {
- return inputBufferSize;
- }
-
- public boolean isLowLantecy() {
- return lowLatency;
- }
-
- public void setInputBufferSize(int bufferSize) {
- this.inputBufferSize=bufferSize;
- }
-
- public void setLowLatency(boolean lowLatency) {
- this.lowLatency=lowLatency;
- }
+/**
+ * The default configuration for a serial session {@link SerialSessionConfig}.
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev: 0 $, $Date: 0 $
+ */
+public class DefaultSerialSessionConfig implements SerialSessionConfig {
+
+ private int receiveThreshold = -1;
+
+ private int inputBufferSize = 8;
+
+ private int receiveTimeout = -1;
+
+ private boolean lowLatency = false;
+
+ public DefaultSerialSessionConfig() {
+
+ }
+
+ @Override
+ public Object clone() {
+ return new DefaultSerialSessionConfig();
+ }
+
+ public int getInputBufferSize() {
+ return inputBufferSize;
+ }
+
+ public boolean isLowLantecy() {
+ return lowLatency;
+ }
+
+ public void setInputBufferSize(int bufferSize) {
+ this.inputBufferSize = bufferSize;
+ }
+
+ public void setLowLatency(boolean lowLatency) {
+ this.lowLatency = lowLatency;
+ }
+
+ public int getReceiveThreshold() {
+ return receiveThreshold;
+ }
+
+ public void setReceiveThreshold(int bytes) {
+ receiveThreshold = bytes;
+ }
+
+ public int getReceiveTimeout() {
+ return receiveTimeout;
+ }
+
+ public void setReceiveTimeout(int milliseconds) {
+ receiveTimeout = milliseconds;
+ }
}
Modified: mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialAddress.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialAddress.java?view=diff&rev=526184&r1=526183&r2=526184
==============================================================================
--- mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialAddress.java (original)
+++ mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialAddress.java Fri Apr 6 07:44:58 2007
@@ -20,158 +20,145 @@
package org.apache.mina.transport.serial;
+import gnu.io.SerialPort;
+
import java.net.SocketAddress;
import java.security.InvalidParameterException;
-import javax.comm.SerialPort;
-
+/**
+ * An address for a serial port communication.
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev: 0 $, $Date: 0 $
+ */
public class SerialAddress extends SocketAddress {
-
- private static final long serialVersionUID = 1735370510442384505L;
- public enum DataBits
- {
- DATABITS_5,
- DATABITS_6,
- DATABITS_7,
- DATABITS_8
- }
-
- public enum Parity
- {
- NONE,
- ODD,
- EVEN,
- MARK,
- SPACE
- }
-
- public enum StopBits
- {
- BITS_1,
- BITS_2,
- BITS_1_5
- }
-
- public enum FlowControl {
- NONE,
- RTSCTS_IN,
- RTSCTS_OUT,
- XONXOFF_IN,
- XONXOFF_OUT
- }
-
- private String name;
- private int bauds;
- private int dataBits;
- private StopBits stopBits;
- private Parity parity;
- private FlowControl flowControl;
-
-
- public SerialAddress(String name, int bauds, int dataBits, StopBits stopBits,
- Parity parity, FlowControl flowControl)
- {
- super();
- this.name = name;
- this.bauds = bauds;
- this.dataBits = dataBits;
- this.stopBits = stopBits;
- this.parity = parity;
- this.flowControl = flowControl;
- }
-
- public int getBauds() {
- return bauds;
- }
-
- public int getDataBits() {
- return dataBits;
- }
-
- public FlowControl getFlowControl() {
- return flowControl;
- }
-
- public String getName() {
- return name;
- }
-
- public Parity getParity() {
- return parity;
- }
-
- public StopBits getStopBits() {
- return stopBits;
- }
-
- public String toString() {
- return "serial("+name+",bauds:"+bauds+",databits:"+dataBits+",stopbits:"+stopBits+",parity:"+parity+",flowcontrol:"+flowControl+")";
- }
-
- int getDataBitsForRXTX()
- {
- switch (dataBits)
- {
- case 5:
- return SerialPort.DATABITS_5;
- case 6:
- return SerialPort.DATABITS_6;
- case 7:
- return SerialPort.DATABITS_7;
- case 8:
- return SerialPort.DATABITS_8;
- }
- throw new InvalidParameterException( "broken databits");
- }
-
-
- int getStopBitsForRXTX()
- {
- switch (stopBits)
- {
- case BITS_1:
- return SerialPort.STOPBITS_1;
- case BITS_1_5:
- return SerialPort.STOPBITS_1_5;
- case BITS_2:
- return SerialPort.STOPBITS_2;
- }
- throw new InvalidParameterException( "broken stopbits");
- }
-
- int getParityForRXTX()
- {
- switch (parity)
- {
- case EVEN:
- return SerialPort.PARITY_EVEN;
- case MARK:
- return SerialPort.PARITY_MARK;
- case NONE:
- return SerialPort.PARITY_NONE;
- case ODD:
- return SerialPort.PARITY_ODD;
- case SPACE:
- return SerialPort.PARITY_SPACE;
- }
- throw new InvalidParameterException( "broken parity");
- }
-
- int getFLowControlForRXTX() {
- switch(flowControl)
- {
- case NONE:
- return SerialPort.FLOWCONTROL_NONE;
- case RTSCTS_IN:
- return SerialPort.FLOWCONTROL_RTSCTS_IN;
- case RTSCTS_OUT:
- return SerialPort.FLOWCONTROL_RTSCTS_OUT;
- case XONXOFF_IN:
- return SerialPort.FLOWCONTROL_XONXOFF_IN;
- case XONXOFF_OUT:
- return SerialPort.FLOWCONTROL_XONXOFF_OUT;
- }
- throw new InvalidParameterException( "broken stopbits");
- }
+ private static final long serialVersionUID = 1735370510442384505L;
+
+ public enum DataBits {
+ DATABITS_5, DATABITS_6, DATABITS_7, DATABITS_8
+ }
+
+ public enum Parity {
+ NONE, ODD, EVEN, MARK, SPACE
+ }
+
+ public enum StopBits {
+ BITS_1, BITS_2, BITS_1_5
+ }
+
+ public enum FlowControl {
+ NONE, RTSCTS_IN, RTSCTS_OUT, XONXOFF_IN, XONXOFF_OUT
+ }
+
+ private String name;
+
+ private int bauds;
+
+ private int dataBits;
+
+ private StopBits stopBits;
+
+ private Parity parity;
+
+ private FlowControl flowControl;
+
+ public SerialAddress(String name, int bauds, int dataBits,
+ StopBits stopBits, Parity parity, FlowControl flowControl) {
+ super();
+ this.name = name;
+ this.bauds = bauds;
+ this.dataBits = dataBits;
+ this.stopBits = stopBits;
+ this.parity = parity;
+ this.flowControl = flowControl;
+ }
+
+ public int getBauds() {
+ return bauds;
+ }
+
+ public int getDataBits() {
+ return dataBits;
+ }
+
+ public FlowControl getFlowControl() {
+ return flowControl;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Parity getParity() {
+ return parity;
+ }
+
+ public StopBits getStopBits() {
+ return stopBits;
+ }
+
+ public String toString() {
+ return "serial(" + name + ",bauds:" + bauds + ",databits:" + dataBits
+ + ",stopbits:" + stopBits + ",parity:" + parity
+ + ",flowcontrol:" + flowControl + ")";
+ }
+
+ int getDataBitsForRXTX() {
+ switch (dataBits) {
+ case 5:
+ return SerialPort.DATABITS_5;
+ case 6:
+ return SerialPort.DATABITS_6;
+ case 7:
+ return SerialPort.DATABITS_7;
+ case 8:
+ return SerialPort.DATABITS_8;
+ }
+ throw new InvalidParameterException("broken databits");
+ }
+
+ int getStopBitsForRXTX() {
+ switch (stopBits) {
+ case BITS_1:
+ return SerialPort.STOPBITS_1;
+ case BITS_1_5:
+ return SerialPort.STOPBITS_1_5;
+ case BITS_2:
+ return SerialPort.STOPBITS_2;
+ }
+ throw new InvalidParameterException("broken stopbits");
+ }
+
+ int getParityForRXTX() {
+ switch (parity) {
+ case EVEN:
+ return SerialPort.PARITY_EVEN;
+ case MARK:
+ return SerialPort.PARITY_MARK;
+ case NONE:
+ return SerialPort.PARITY_NONE;
+ case ODD:
+ return SerialPort.PARITY_ODD;
+ case SPACE:
+ return SerialPort.PARITY_SPACE;
+ }
+ throw new InvalidParameterException("broken parity");
+ }
+
+ int getFLowControlForRXTX() {
+ switch (flowControl) {
+ case NONE:
+ return SerialPort.FLOWCONTROL_NONE;
+ case RTSCTS_IN:
+ return SerialPort.FLOWCONTROL_RTSCTS_IN;
+ case RTSCTS_OUT:
+ return SerialPort.FLOWCONTROL_RTSCTS_OUT;
+ case XONXOFF_IN:
+ return SerialPort.FLOWCONTROL_XONXOFF_IN;
+ case XONXOFF_OUT:
+ return SerialPort.FLOWCONTROL_XONXOFF_OUT;
+ }
+ throw new InvalidParameterException("broken stopbits");
+ }
}
Modified: mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java?view=diff&rev=526184&r1=526183&r2=526184
==============================================================================
--- mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java (original)
+++ mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialConnector.java Fri Apr 6 07:44:58 2007
@@ -1,17 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.mina.transport.serial;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.util.Enumeration;
-import java.util.TooManyListenersException;
-
import gnu.io.CommPortIdentifier;
import gnu.io.PortInUseException;
import gnu.io.SerialPort;
import gnu.io.UnsupportedCommOperationException;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.Enumeration;
+import java.util.TooManyListenersException;
+
import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.IoConnector;
import org.apache.mina.common.TransportType;
import org.apache.mina.common.support.BaseIoConnector;
import org.apache.mina.common.support.DefaultConnectFuture;
@@ -19,96 +38,121 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SerialConnector extends BaseIoConnector
-{
- private Logger log;
-
- public SerialConnector()
- {
- super(new DefaultSerialSessionConfig());
- log=LoggerFactory.getLogger(SerialConnector.class);
- }
-
- @Override
- protected IoServiceListenerSupport getListeners()
- {
+/**
+ * {@link IoConnector} for serial communication transport.
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev: 0 $, $Date: 0 $
+ */
+public class SerialConnector extends BaseIoConnector {
+ private Logger log;
+
+ public SerialConnector() {
+ super(new DefaultSerialSessionConfig());
+ log = LoggerFactory.getLogger(SerialConnector.class);
+ }
+
+ @Override
+ protected IoServiceListenerSupport getListeners() {
return super.getListeners();
}
- @Override
- protected ConnectFuture doConnect(SocketAddress remoteAddress, SocketAddress localAddress) {
- if( ! (remoteAddress instanceof SerialAddress) )
- {
- throw new IllegalArgumentException("Bad SocketAddress, need a SerialPortAddress");
- }
-
- CommPortIdentifier portId;
- Enumeration portList = CommPortIdentifier.getPortIdentifiers();
-
- SerialAddress portAddress = (SerialAddress) remoteAddress;
-
- // looping around found ports
- while (portList.hasMoreElements())
- {
- portId = (CommPortIdentifier) portList.nextElement();
- if (portId.getPortType() == CommPortIdentifier.PORT_SERIAL)
- {
- if(log.isDebugEnabled())
- {
- log.debug("Serial port discovered : "+portId.getName());
- }
- if (portId.getName().equals(portAddress.getName())) {
- try {
- if(log.isDebugEnabled())
- {
- log.debug("Serial port found : "+portId.getName());
- }
- SerialPort serialPort = (SerialPort) portId.open(
- "Apache MINA", 2000); // TODO: need a parameter for millisec. timeout
-
- serialPort.setSerialPortParams(portAddress.getBauds(),
- portAddress.getDataBitsForRXTX(),portAddress.getStopBitsForRXTX(),
- portAddress.getParityForRXTX());
- serialPort.setFlowControlMode(portAddress.getFLowControlForRXTX());
-
- // TODO : receive threshold
- // serialPort.enableReceiveThreshold(receiveThreshold); /* bytes */
- // TODO : reveive Timeout serialPort.enableReceiveTimeout(10); /* milliseconds */
-
- serialPort.notifyOnDataAvailable(true);
- //serialPort.setLowLatency();
- serialPort.setInputBufferSize(8);
- serialPort.enableReceiveTimeout(10);
-
- ConnectFuture future = new DefaultConnectFuture();
- SerialSession session = new SerialSession(this,portAddress,serialPort);
- session.start();
- future.setSession( session );
- return future;
- } catch (PortInUseException e) {
- if(log.isDebugEnabled())
- log.debug("Port In Use Exception : ",e);
- return DefaultConnectFuture.newFailedFuture(e);
- } catch (UnsupportedCommOperationException e) {
- if(log.isDebugEnabled())
- log.debug("Comm Exception : ",e);
- return DefaultConnectFuture.newFailedFuture(e);
- } catch (IOException e) {
- if(log.isDebugEnabled())
- log.debug("IOException : ",e);
- return DefaultConnectFuture.newFailedFuture(e);
- } catch (TooManyListenersException e) {
- if(log.isDebugEnabled())
- log.debug("TooManyListenersException : ",e);
- return DefaultConnectFuture.newFailedFuture(e);
- }
- }
- }
- }
- return DefaultConnectFuture.newFailedFuture(new RuntimeException("Serial port not found"));
- }
-
- public TransportType getTransportType() {
- return SerialSession.serialTransportType;
- }
+ @Override
+ protected ConnectFuture doConnect(SocketAddress remoteAddress,
+ SocketAddress localAddress) {
+ if (!(remoteAddress instanceof SerialAddress)) {
+ throw new IllegalArgumentException(
+ "Bad SocketAddress, need a SerialPortAddress");
+ }
+
+ CommPortIdentifier portId;
+ Enumeration portList = CommPortIdentifier.getPortIdentifiers();
+
+ SerialAddress portAddress = (SerialAddress) remoteAddress;
+
+ // looping around found ports
+ while (portList.hasMoreElements()) {
+ portId = (CommPortIdentifier) portList.nextElement();
+ if (portId.getPortType() == CommPortIdentifier.PORT_SERIAL) {
+ if (log.isDebugEnabled()) {
+ log.debug("Serial port discovered : " + portId.getName());
+ }
+ if (portId.getName().equals(portAddress.getName())) {
+ try {
+ if (log.isDebugEnabled()) {
+ log
+ .debug("Serial port found : "
+ + portId.getName());
+ }
+
+ SerialPort serialPort = initalizePort("Apache MINA",
+ 2000, portId, portAddress);
+
+ ConnectFuture future = new DefaultConnectFuture();
+ SerialSession session = new SerialSession(this,
+ portAddress, serialPort);
+ session.start();
+ future.setSession(session);
+ return future;
+ } catch (PortInUseException e) {
+ if (log.isDebugEnabled())
+ log.debug("Port In Use Exception : ", e);
+ return DefaultConnectFuture.newFailedFuture(e);
+ } catch (UnsupportedCommOperationException e) {
+ if (log.isDebugEnabled())
+ log.debug("Comm Exception : ", e);
+ return DefaultConnectFuture.newFailedFuture(e);
+ } catch (IOException e) {
+ if (log.isDebugEnabled())
+ log.debug("IOException : ", e);
+ return DefaultConnectFuture.newFailedFuture(e);
+ } catch (TooManyListenersException e) {
+ if (log.isDebugEnabled())
+ log.debug("TooManyListenersException : ", e);
+ return DefaultConnectFuture.newFailedFuture(e);
+ }
+ }
+ }
+ }
+ return DefaultConnectFuture.newFailedFuture(new RuntimeException(
+ "Serial port not found"));
+ }
+
+ public TransportType getTransportType() {
+ return SerialSession.serialTransportType;
+ }
+
+ private SerialPort initalizePort(String user, int timeout,
+ CommPortIdentifier portId, SerialAddress portAddress)
+ throws UnsupportedCommOperationException, PortInUseException {
+ SerialPort serialPort = (SerialPort) portId.open("Apache MINA", 2000); // TODO: need a parameter for millisec. timeout
+
+ serialPort.setSerialPortParams(portAddress.getBauds(), portAddress
+ .getDataBitsForRXTX(), portAddress.getStopBitsForRXTX(),
+ portAddress.getParityForRXTX());
+
+ serialPort.setFlowControlMode(portAddress.getFLowControlForRXTX());
+
+ serialPort.notifyOnDataAvailable(true);
+ SerialSessionConfig config = (SerialSessionConfig) getSessionConfig();
+
+ if (config.isLowLantecy())
+ serialPort.setLowLatency();
+
+ serialPort.setInputBufferSize(config.getInputBufferSize());
+
+ if (config.getReceiveThreshold() >= 0) {
+ serialPort.enableReceiveThreshold(config.getReceiveThreshold());
+ } else
+ serialPort.disableReceiveThreshold();
+
+ if (config.getReceiveTimeout() >= 0) {
+ serialPort.enableReceiveTimeout(config.getReceiveTimeout());
+ } else
+ serialPort.disableReceiveTimeout();
+
+ serialPort.enableReceiveThreshold(8);
+
+ return serialPort;
+ }
}
Modified: mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialFilterChain.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialFilterChain.java?view=diff&rev=526184&r1=526183&r2=526184
==============================================================================
--- mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialFilterChain.java (original)
+++ mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialFilterChain.java Fri Apr 6 07:44:58 2007
@@ -1,40 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.mina.transport.serial;
import java.util.Queue;
import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.AbstractIoFilterChain;
+/**
+ * An {@link IoFilterChain} for serial communication transport.
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev: 0 $, $Date: 0 $
+ */
public class SerialFilterChain extends AbstractIoFilterChain {
- protected SerialFilterChain(IoSession session) {
- super(session);
- }
-
- @Override
- protected void doClose(IoSession session) throws Exception {
- ((SerialSession)session).closeSerialPort();
- }
-
- @Override
- protected void doWrite(IoSession session, WriteRequest writeRequest) throws Exception {
- SerialSession s=(SerialSession)session;
- Queue<WriteRequest> queue = s.getWriteRequestQueue();
+ protected SerialFilterChain(IoSession session) {
+ super(session);
+ }
+
+ @Override
+ protected void doClose(IoSession session) throws Exception {
+ ((SerialSession) session).closeSerialPort();
+ }
+
+ @Override
+ protected void doWrite(IoSession session, WriteRequest writeRequest)
+ throws Exception {
+ SerialSession s = (SerialSession) session;
+ Queue<WriteRequest> queue = s.getWriteRequestQueue();
- // SocketIoProcessor.doFlush() will reset it after write is finished
+ // SocketIoProcessor.doFlush() will reset it after write is finished
// because the buffer will be passed with messageSent event.
- ( ( ByteBuffer ) writeRequest.getMessage() ).mark();
- synchronized( queue )
- {
- queue.offer( writeRequest );
- if( queue.size() == 1 && session.getTrafficMask().isWritable() )
- {
+ ((ByteBuffer) writeRequest.getMessage()).mark();
+ synchronized (queue) {
+ queue.offer(writeRequest);
+ if (queue.size() == 1 && session.getTrafficMask().isWritable()) {
// Notify serial session worker only when writeRequestQueue was empty.
s.notifyWriteWorker();
}
}
- }
+ }
}
Modified: mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java?view=diff&rev=526184&r1=526183&r2=526184
==============================================================================
--- mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java (original)
+++ mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSession.java Fri Apr 6 07:44:58 2007
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.mina.transport.serial;
import java.io.IOException;
@@ -17,262 +36,271 @@
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionConfig;
import org.apache.mina.common.TransportType;
import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.BaseIoSession;
import org.apache.mina.common.support.DefaultTransportType;
+import org.apache.mina.common.support.SessionIdleStatusChecker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * An {@link IoSession} for serial communication transport.
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev: 0 $, $Date: 0 $
+ */
public class SerialSession extends BaseIoSession implements
- SerialPortEventListener {
+ SerialPortEventListener {
- private SerialSessionConfig config;
+ private SerialSessionConfig config;
- private IoHandler ioHandler;
+ private IoHandler ioHandler;
- private IoFilterChain filterChain;
+ private IoFilterChain filterChain;
- private IoService service;
-
- private SerialAddress address;
-
- private final Queue<WriteRequest> writeRequestQueue;
-
- private InputStream inputStream;
-
- private OutputStream outputStream;
-
- private SerialPort port;
-
- private Logger log;
-
-
- public static final TransportType serialTransportType = new DefaultTransportType( "serial communication", false, SerialAddress.class,
- ByteBuffer.class, SerialSessionConfig.class );
-
- SerialSession(IoService service, SerialAddress address, SerialPort port) {
- this.service = service;
- this.ioHandler = service.getHandler();
- this.filterChain = new SerialFilterChain(this);
- this.writeRequestQueue = new LinkedList<WriteRequest>();
- this.port = port;
-
- log = LoggerFactory.getLogger(SerialSession.class);
- }
-
- @Override
- protected void updateTrafficMask() {
- throw new UnsupportedOperationException();
- }
-
- public IoSessionConfig getConfig() {
- return config;
- }
-
- public IoFilterChain getFilterChain() {
- return filterChain;
- }
-
- public IoHandler getHandler() {
- return ioHandler;
- }
-
- public SocketAddress getLocalAddress() {
- return null; // not applicable
- }
-
- public SocketAddress getRemoteAddress() {
- return address;
- }
-
- Queue<WriteRequest> getWriteRequestQueue() {
- return writeRequestQueue;
- }
-
- public int getScheduledWriteMessages() {
- synchronized (writeRequestQueue) {
- return writeRequestQueue.size();
- }
- }
-
- public int getScheduledWriteBytes() {
- int size = 0;
- synchronized (writeRequestQueue) {
- for (Object o : writeRequestQueue) {
- if (o instanceof ByteBuffer) {
- size += ((ByteBuffer) o).remaining();
- }
- }
- }
- return size;
- }
-
- public IoService getService() {
- return service;
- }
-
- public TransportType getTransportType() {
- return serialTransportType;
- }
-
- protected void close0()
- {
- filterChain.fireFilterClose( this );
- }
-
- protected void write0( WriteRequest writeRequest )
- {
- filterChain.fireFilterWrite( this, writeRequest );
- }
-
- /**
- * start handling streams
- *
- * @throws IOException
- * @throws TooManyListenersException
- */
- void start() throws IOException, TooManyListenersException {
- inputStream = port.getInputStream();
- outputStream = port.getOutputStream();
- ReadWorker w = new ReadWorker();
- w.start();
- port.addEventListener(this);
- ((SerialConnector)getService()).getListeners().fireSessionCreated(this);
- }
-
- private Object writeMonitor = new Object();
-
- private WriteWorker writeWorker;
-
- private class WriteWorker extends Thread {
- public void run() {
- while (isConnected() && !isClosing()) {
- flushWrites();
-
- // wait for more data
- synchronized (writeMonitor) {
- try {
- writeMonitor.wait();
- } catch (InterruptedException e) {
- log.error("InterruptedException", e);
- }
- }
- }
- }
- }
-
- private void flushWrites() {
- for (;;) {
- WriteRequest req;
-
- synchronized (writeRequestQueue) {
- req = (WriteRequest) writeRequestQueue.peek();
- }
-
- if (req == null)
- break;
-
- ByteBuffer buf = (ByteBuffer) req.getMessage();
- if (buf.remaining() == 0) {
- synchronized (writeRequestQueue) {
- writeRequestQueue.poll();
- }
- this.increaseWrittenMessages();
-
- buf.reset();
-
- this.getFilterChain().fireMessageSent(this, req);
- continue;
- }
-
- int writtenBytes = buf.remaining();
- try {
- outputStream.write(buf.array());
- buf.position(buf.position()+writtenBytes);
- this.increaseWrittenBytes(writtenBytes);
- } catch (IOException e) {
- this.getFilterChain().fireExceptionCaught(this, e);
- }
- }
- }
-
- void notifyWriteWorker() {
- if (writeWorker == null) {
- writeWorker = new WriteWorker();
- writeWorker.start();
- } else {
- synchronized (writeMonitor) {
- writeMonitor.notifyAll();
- }
- }
- }
-
- private Object readReadyMonitor = new Object();
-
- private class ReadWorker extends Thread {
- @Override
- public void run() {
- while (isConnected() && !isClosing()) {
- synchronized (readReadyMonitor) {
- try {
- readReadyMonitor.wait();
- } catch (InterruptedException e) {
- log.error("InterruptedException", e);
- }
- if(isClosing() || !isConnected())
- break;
- int dataSize;
- try {
- dataSize = inputStream.available();
- byte[] data = new byte[dataSize];
- int readBytes = inputStream.read(data);
-
- if( readBytes > 0 )
- {
- increaseReadBytes( readBytes );
- // TODO : check if it's the good allocation way
- ByteBuffer buf = ByteBuffer.allocate( readBytes );
- buf.put(data,0,readBytes);
- buf.flip();
- getFilterChain().fireMessageReceived( SerialSession.this, buf );
- }
- } catch (IOException e) {
- getFilterChain().fireExceptionCaught(
- SerialSession.this, e);
- }
- }
- }
- }
- }
-
- public void serialEvent(SerialPortEvent evt) {
- if (evt.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
- synchronized (readReadyMonitor) {
- readReadyMonitor.notifyAll();
- }
- }
- }
-
- public void closeSerialPort() {
- try {
- inputStream.close();
- } catch (IOException e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- try {
- outputStream.close();
- } catch (IOException e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
-
- port.close();
- notifyWriteWorker();
- synchronized (readReadyMonitor) {
- readReadyMonitor.notifyAll();
- }
-
- ((SerialConnector)getService()).getListeners().fireSessionDestroyed(this);
- }
+ private IoService service;
+
+ private SerialAddress address;
+
+ private final Queue<WriteRequest> writeRequestQueue;
+
+ private InputStream inputStream;
+
+ private OutputStream outputStream;
+
+ private SerialPort port;
+
+ private Logger log;
+
+ public static final TransportType serialTransportType = new DefaultTransportType(
+ "serial communication", false, SerialAddress.class,
+ ByteBuffer.class, SerialSessionConfig.class);
+
+ SerialSession(IoService service, SerialAddress address, SerialPort port) {
+ this.service = service;
+ this.ioHandler = service.getHandler();
+ this.filterChain = new SerialFilterChain(this);
+ this.writeRequestQueue = new LinkedList<WriteRequest>();
+ this.port = port;
+
+ log = LoggerFactory.getLogger(SerialSession.class);
+ }
+
+ @Override
+ protected void updateTrafficMask() {
+ throw new UnsupportedOperationException();
+ }
+
+ public IoSessionConfig getConfig() {
+ return config;
+ }
+
+ public IoFilterChain getFilterChain() {
+ return filterChain;
+ }
+
+ public IoHandler getHandler() {
+ return ioHandler;
+ }
+
+ public SocketAddress getLocalAddress() {
+ return null; // not applicable
+ }
+
+ public SocketAddress getRemoteAddress() {
+ return address;
+ }
+
+ Queue<WriteRequest> getWriteRequestQueue() {
+ return writeRequestQueue;
+ }
+
+ public int getScheduledWriteMessages() {
+ synchronized (writeRequestQueue) {
+ return writeRequestQueue.size();
+ }
+ }
+
+ public int getScheduledWriteBytes() {
+ int size = 0;
+ synchronized (writeRequestQueue) {
+ for (Object o : writeRequestQueue) {
+ if (o instanceof ByteBuffer) {
+ size += ((ByteBuffer) o).remaining();
+ }
+ }
+ }
+ return size;
+ }
+
+ public IoService getService() {
+ return service;
+ }
+
+ public TransportType getTransportType() {
+ return serialTransportType;
+ }
+
+ protected void close0() {
+ filterChain.fireFilterClose(this);
+ }
+
+ protected void write0(WriteRequest writeRequest) {
+ filterChain.fireFilterWrite(this, writeRequest);
+ }
+
+ /**
+ * start handling streams
+ *
+ * @throws IOException
+ * @throws TooManyListenersException
+ */
+ void start() throws IOException, TooManyListenersException {
+ inputStream = port.getInputStream();
+ outputStream = port.getOutputStream();
+ ReadWorker w = new ReadWorker();
+ w.start();
+ port.addEventListener(this);
+ SessionIdleStatusChecker.getInstance().addSession(this);
+ ((SerialConnector) getService()).getListeners()
+ .fireSessionCreated(this);
+ }
+
+ private Object writeMonitor = new Object();
+
+ private WriteWorker writeWorker;
+
+ private class WriteWorker extends Thread {
+ public void run() {
+ while (isConnected() && !isClosing()) {
+ flushWrites();
+
+ // wait for more data
+ synchronized (writeMonitor) {
+ try {
+ writeMonitor.wait();
+ } catch (InterruptedException e) {
+ log.error("InterruptedException", e);
+ }
+ }
+ }
+ }
+ }
+
+ private void flushWrites() {
+ for (;;) {
+ WriteRequest req;
+
+ synchronized (writeRequestQueue) {
+ req = (WriteRequest) writeRequestQueue.peek();
+ }
+
+ if (req == null)
+ break;
+
+ ByteBuffer buf = (ByteBuffer) req.getMessage();
+ if (buf.remaining() == 0) {
+ synchronized (writeRequestQueue) {
+ writeRequestQueue.poll();
+ }
+ this.increaseWrittenMessages();
+
+ buf.reset();
+
+ this.getFilterChain().fireMessageSent(this, req);
+ continue;
+ }
+
+ int writtenBytes = buf.remaining();
+ try {
+ outputStream.write(buf.array());
+ buf.position(buf.position() + writtenBytes);
+ this.increaseWrittenBytes(writtenBytes);
+ } catch (IOException e) {
+ this.getFilterChain().fireExceptionCaught(this, e);
+ }
+ }
+ }
+
+ void notifyWriteWorker() {
+ if (writeWorker == null) {
+ writeWorker = new WriteWorker();
+ writeWorker.start();
+ } else {
+ synchronized (writeMonitor) {
+ writeMonitor.notifyAll();
+ }
+ }
+ }
+
+ private Object readReadyMonitor = new Object();
+
+ private class ReadWorker extends Thread {
+ @Override
+ public void run() {
+ while (isConnected() && !isClosing()) {
+ synchronized (readReadyMonitor) {
+ try {
+ readReadyMonitor.wait();
+ } catch (InterruptedException e) {
+ log.error("InterruptedException", e);
+ }
+ if (isClosing() || !isConnected())
+ break;
+ int dataSize;
+ try {
+ dataSize = inputStream.available();
+ byte[] data = new byte[dataSize];
+ int readBytes = inputStream.read(data);
+
+ if (readBytes > 0) {
+ increaseReadBytes(readBytes);
+ // TODO : check if it's the good allocation way
+ ByteBuffer buf = ByteBuffer.allocate(readBytes);
+ buf.put(data, 0, readBytes);
+ buf.flip();
+ getFilterChain().fireMessageReceived(
+ SerialSession.this, buf);
+ }
+ } catch (IOException e) {
+ getFilterChain().fireExceptionCaught(
+ SerialSession.this, e);
+ }
+ }
+ }
+ }
+ }
+
+ public void serialEvent(SerialPortEvent evt) {
+ if (evt.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
+ synchronized (readReadyMonitor) {
+ readReadyMonitor.notifyAll();
+ }
+ }
+ }
+
+ public void closeSerialPort() {
+ try {
+ inputStream.close();
+ } catch (IOException e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ try {
+ outputStream.close();
+ } catch (IOException e) {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+
+ port.close();
+ notifyWriteWorker();
+ synchronized (readReadyMonitor) {
+ readReadyMonitor.notifyAll();
+ }
+
+ ((SerialConnector) getService()).getListeners().fireSessionDestroyed(
+ this);
+ }
}
Modified: mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSessionConfig.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSessionConfig.java?view=diff&rev=526184&r1=526183&r2=526184
==============================================================================
--- mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSessionConfig.java (original)
+++ mina/sandbox/jvermillard/serial/src/main/java/org/apache/mina/transport/serial/SerialSessionConfig.java Fri Apr 6 07:44:58 2007
@@ -1,14 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.mina.transport.serial;
import org.apache.mina.common.IoSessionConfig;
+/**
+ * An {@link IoSessionConfig} for serial transport type.
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev: 0 $, $Date: 0 $
+ */
public interface SerialSessionConfig extends IoSessionConfig {
-
- int getInputBufferSize();
-
- void setInputBufferSize(int bufferSize);
-
- boolean isLowLantecy();
-
- void setLowLatency(boolean lowLatency);
+
+ int getInputBufferSize();
+
+ void setInputBufferSize(int bufferSize);
+
+ boolean isLowLantecy();
+
+ void setLowLatency(boolean lowLatency);
+
+ int getReceiveThreshold();
+
+ void setReceiveThreshold(int bytes);
+
+ int getReceiveTimeout();
+
+ void setReceiveTimeout(int milliseconds);
}