You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by ap...@apache.org on 2011/06/30 13:29:32 UTC
svn commit: r1141485 - in
/mina/branches/3.0/core/src/main/java/org/apache/mina: IoFilter.java
service/client/AbstractIoClient.java transport/tcp/NioSelectorProcessor.java
Author: apaliwal
Date: Thu Jun 30 11:29:31 2011
New Revision: 1141485
URL: http://svn.apache.org/viewvc?rev=1141485&view=rev
Log:
Formating/Logging changes, added Lock usage instead of synchornized, added exceptionCaught api in IoFilter
Modified:
mina/branches/3.0/core/src/main/java/org/apache/mina/IoFilter.java
mina/branches/3.0/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
Modified: mina/branches/3.0/core/src/main/java/org/apache/mina/IoFilter.java
URL: http://svn.apache.org/viewvc/mina/branches/3.0/core/src/main/java/org/apache/mina/IoFilter.java?rev=1141485&r1=1141484&r2=1141485&view=diff
==============================================================================
--- mina/branches/3.0/core/src/main/java/org/apache/mina/IoFilter.java (original)
+++ mina/branches/3.0/core/src/main/java/org/apache/mina/IoFilter.java Thu Jun 30 11:29:31 2011
@@ -96,4 +96,13 @@ public interface IoFilter {
* @throws Exception Exception If an error occurs while processing
*/
void messageReceived(IoSession session, Object message) throws Exception;
+
+ /**
+ * Invoked when an exception occurs while executing the method
+ *
+ * @param session {@link IoSession} associated with invocation
+ * @param cause Real {@link Throwable} which broke the normal chain processing
+ * @throws Exception If an error occurs while processing
+ */
+ void exceptionCaught(IoSession session, Throwable cause) throws Exception;
}
\ No newline at end of file
Modified: mina/branches/3.0/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java
URL: http://svn.apache.org/viewvc/mina/branches/3.0/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java?rev=1141485&r1=1141484&r2=1141485&view=diff
==============================================================================
--- mina/branches/3.0/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java (original)
+++ mina/branches/3.0/core/src/main/java/org/apache/mina/service/client/AbstractIoClient.java Thu Jun 30 11:29:31 2011
@@ -42,51 +42,34 @@ public class AbstractIoClient extends Ab
}
@Override
- public Map<Long, IoSession> getManagedSessions()
- {
- // TODO Auto-generated method stub
+ public Map<Long, IoSession> getManagedSessions() {
return null;
}
@Override
- public void addListener( IoServiceListener listener )
- {
- // TODO Auto-generated method stub
-
+ public void addListener( IoServiceListener listener ) {
}
@Override
- public void removeListener( IoServiceListener listener )
- {
- // TODO Auto-generated method stub
-
+ public void removeListener( IoServiceListener listener ) {
}
@Override
- public long getConnectTimeoutMillis()
- {
- // TODO Auto-generated method stub
+ public long getConnectTimeoutMillis() {
return 0;
}
@Override
- public void setConnectTimeoutMillis( long connectTimeoutInMillis )
- {
- // TODO Auto-generated method stub
-
+ public void setConnectTimeoutMillis( long connectTimeoutInMillis ) {
}
@Override
- public ConnectFuture connect( SocketAddress remoteAddress )
- {
- // TODO Auto-generated method stub
+ public ConnectFuture connect( SocketAddress remoteAddress ) {
return null;
}
@Override
- public ConnectFuture connect( SocketAddress remoteAddress, SocketAddress localAddress )
- {
- // TODO Auto-generated method stub
+ public ConnectFuture connect( SocketAddress remoteAddress, SocketAddress localAddress ) {
return null;
}
Modified: mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
URL: http://svn.apache.org/viewvc/mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java?rev=1141485&r1=1141484&r2=1141485&view=diff
==============================================================================
--- mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java (original)
+++ mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java Thu Jun 30 11:29:31 2011
@@ -33,6 +33,8 @@ import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.mina.IoServer;
import org.apache.mina.IoService;
@@ -62,7 +64,7 @@ public class NioSelectorProcessor implem
private SelectorStrategy strategy;
- private Logger log;
+ private static final Logger LOGGER = LoggerFactory.getLogger(NioSelectorProcessor.class);
private Map<SocketAddress, ServerSocketChannel> serverSocketChannels = new ConcurrentHashMap<SocketAddress, ServerSocketChannel>();
@@ -89,9 +91,11 @@ public class NioSelectorProcessor implem
private Selector selector;
+ // Lock for Selector worker, using default. can look into fairness later
+ private Lock workerLock = new ReentrantLock();
+
public NioSelectorProcessor(String name, SelectorStrategy strategy) {
this.strategy = strategy;
- this.log = LoggerFactory.getLogger("SelectorProcessor[" + name + "]");
// TODO : configurable parameter
readBuffer = ByteBuffer.allocate(1024);
@@ -103,22 +107,24 @@ public class NioSelectorProcessor implem
* @param serverChannel
*/
private void add(ServerSocketChannel serverChannel, IoServer server) {
- log.debug("adding a server channel {} for server {}", serverChannel, server);
+ LOGGER.debug("adding a server channel {} for server {}", serverChannel, server);
serversToAdd.add(new Object[] { serverChannel, server });
wakeupWorker();
}
- private Object workerLock = new Object();
-
private SelectorWorker worker = null;
private void wakeupWorker() {
- synchronized (workerLock) {
+ workerLock.lock();
+ try {
if (worker == null) {
worker = new SelectorWorker();
worker.start();
}
+ } finally {
+ workerLock.unlock();
}
+
if (selector != null) {
selector.wakeup();
}
@@ -140,14 +146,14 @@ public class NioSelectorProcessor implem
channel.socket().close();
channel.close();
serverSocketChannels.remove(channel);
- log.debug("removing a server channel " + channel);
+ LOGGER.debug("removing a server channel " + channel);
serversToRemove.add(channel);
wakeupWorker();
}
@Override
public void createSession(IoService service, Object clientSocket) {
- log.debug("create session");
+ LOGGER.debug("create session");
SocketChannel socketChannel = (SocketChannel) clientSocket;
NioTcpSession session = new NioTcpSession((NioTcpServer) service, socketChannel,
strategy.getSelectorForNewSession(this));
@@ -156,7 +162,7 @@ public class NioSelectorProcessor implem
try {
socketChannel.configureBlocking(false);
} catch (IOException e) {
- log.error("Unexpected exception, while configuring socket as non blocking", e);
+ LOGGER.error("Unexpected exception, while configuring socket as non blocking", e);
}
// TODO : event session created
@@ -180,12 +186,12 @@ public class NioSelectorProcessor implem
@Override
public void run() {
if (selector == null) {
- log.debug("opening a new selector");
+ LOGGER.debug("opening a new selector");
try {
selector = Selector.open();
} catch (IOException e) {
- log.error("IOException while opening a new Selector", e);
+ LOGGER.error("IOException while opening a new Selector", e);
}
}
@@ -198,7 +204,7 @@ public class NioSelectorProcessor implem
SelectionKey key = serverKey.remove(channel);
if (key == null) {
- log.error("The server socket was already removed of the selector");
+ LOGGER.error("The server socket was already removed of the selector");
} else {
key.cancel();
}
@@ -243,9 +249,9 @@ public class NioSelectorProcessor implem
}
}
- log.debug("selecting...");
+ LOGGER.debug("selecting...");
int readyCount = selector.select(SELECT_TIMEOUT);
- log.debug("... done selecting : {}", readyCount);
+ LOGGER.debug("... done selecting : {}", readyCount);
if (readyCount > 0) {
@@ -262,14 +268,14 @@ public class NioSelectorProcessor implem
selector.selectedKeys().remove(key);
if (key.isReadable()) {
- log.debug("readable client {}", key);
+ LOGGER.debug("readable client {}", key);
NioTcpSession session = (NioTcpSession) key.attachment();
SocketChannel channel = session.getSocketChannel();
int readCount = channel.read(readBuffer);
- log.debug("read {} bytes", readCount);
+ LOGGER.debug("read {} bytes", readCount);
if (readCount < 0) {
// session closed by the remote peer
- log.debug("session closed by the remote peer");
+ LOGGER.debug("session closed by the remote peer");
sessionsToClose.add(session);
} else {
// we have read some data
@@ -280,7 +286,7 @@ public class NioSelectorProcessor implem
}
if (key.isWritable()) {
- log.debug("writable session : {}", key.attachment());
+ LOGGER.debug("writable session : {}", key.attachment());
NioTcpSession session = (NioTcpSession) key.attachment();
// write from the session write queue
WriteQueue queue = session.getWriteQueue();
@@ -292,7 +298,7 @@ public class NioSelectorProcessor implem
}
ByteBuffer buf = (ByteBuffer) wreq.getMessage();
int wrote = session.getSocketChannel().write(buf);
- log.debug("wrote {} bytes to {}", wrote, session);
+ LOGGER.debug("wrote {} bytes to {}", wrote, session);
if (buf.remaining() == 0) {
// completed write request, let's remove
// it
@@ -313,12 +319,12 @@ public class NioSelectorProcessor implem
// Selector for reads and another for the writes
SelectionKey readKey = sessionReadKey.get(session);
if (readKey != null) {
- log.debug("registering key for only reading");
+ LOGGER.debug("registering key for only reading");
SelectionKey mykey = session.getSocketChannel().register(selector,
SelectionKey.OP_READ, session);
sessionReadKey.put(session, mykey);
} else {
- log.debug("cancel key for writing");
+ LOGGER.debug("cancel key for writing");
session.getSocketChannel().keyFor(selector).cancel();
}
}
@@ -326,12 +332,12 @@ public class NioSelectorProcessor implem
}
if (key.isAcceptable()) {
- log.debug("acceptable new client {}", key);
+ LOGGER.debug("acceptable new client {}", key);
ServerSocketChannel serverSocket = (ServerSocketChannel) ((Object[]) key.attachment())[0];
IoServer server = (IoServer) (((Object[]) key.attachment())[1]);
// accepted connection
SocketChannel newClientChannel = serverSocket.accept();
- log.debug("client accepted");
+ LOGGER.debug("client accepted");
// and give it's to the strategy
strategy.getSelectorForNewSession(NioSelectorProcessor.this).createSession(server,
newClientChannel);
@@ -359,15 +365,18 @@ public class NioSelectorProcessor implem
}
}
} catch (IOException e) {
- log.error("IOException while selecting selector", e);
+ LOGGER.error("IOException while selecting selector", e);
}
// stop the worker if needed
- synchronized (workerLock) {
+ workerLock.lock();
+ try {
if (selector.keys().isEmpty()) {
worker = null;
break;
}
+ }finally {
+ workerLock.unlock();
}
}
}
@@ -375,7 +384,7 @@ public class NioSelectorProcessor implem
@Override
public void flush(IoSession session) {
- log.debug("scheduling session {} for writing", session.toString());
+ LOGGER.debug("scheduling session {} for writing", session.toString());
// add the session to the list of session to be registered for writing
// wake the selector
flushingSessions.add((NioTcpSession) session);