You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by co...@apache.org on 2008/06/14 17:36:49 UTC
svn commit: r667817 - in
/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net:
SelectorCallback.java SelectorThread.java SelectorThreadNio.java
Author: costin
Date: Sat Jun 14 08:36:49 2008
New Revision: 667817
URL: http://svn.apache.org/viewvc?rev=667817&view=rev
Log:
Selector abstraction, apr version not ready yet ( bugs - but seems a bit faster, so worth finishing ).
Modified:
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java
Modified: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java?rev=667817&r1=667816&r2=667817&view=diff
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java (original)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java Sat Jun 14 08:36:49 2008
@@ -18,6 +18,8 @@
import java.io.IOException;
import java.nio.channels.Channel;
+import org.apache.tomcat.util.net.SelectorThread.SelectorData;
+
/**
* Notiy user code of events. All methods are called from the selector thread,
* they should not block. The reason is to allow parsing and non-blocking
@@ -30,16 +32,11 @@
* ( older version used long - but non-blocking connect needs a second param )
*/
public class SelectorCallback {
- protected SelectorThread.SelectorData selectorData = new SelectorThread.SelectorData(this);
-
- public SelectorThread getSelector() {
- return selectorData.sel;
- }
/**
* Called when the protocol is connected.
*/
- public void connected(SelectorThread selThread)
+ public void connected(SelectorData selThread)
throws IOException {
}
@@ -47,27 +44,34 @@
* It is possible to write data.
* For both read and write - re-enable interest if you want more data.
*/
- public void dataWriteable(SelectorThread selThread) throws IOException {
+ public void dataWriteable(SelectorData selThread) throws IOException {
}
/**
* Data available for read.
* For both read and write - re-enable interest if you want more data.
*/
- public void dataReceived(SelectorThread selThread) throws IOException {
+ public void dataReceived(SelectorData selThread) throws IOException {
}
/**
* nextTimeEvent reached.
*/
- public void timeEvent(SelectorThread selThread) {
+ public void timeEvent(SelectorData selThread) {
+ }
+
+ /**
+ * @throws IOException
+ *
+ */
+ public void ioThreadRun(SelectorData selThread) throws IOException {
}
/**
* Close was detected, or an unhandled exception happened while processing
* this callback.
*/
- public void channelClosed(SelectorThread selThread, Throwable ex) {
+ public void channelClosed(SelectorData selThread, Throwable ex) {
}
/**
@@ -79,7 +83,7 @@
* TODO: is there any case where something else besides registering read
* interest on the new connection is needed ? Maybe it could read some data ?
*/
- public SelectorCallback connectionAccepted(SelectorThread selThread,
+ public SelectorCallback connectionAccepted(SelectorData selThread,
Channel sockC) {
return null;
}
Modified: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java?rev=667817&r1=667816&r2=667817&view=diff
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java (original)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java Sat Jun 14 08:36:49 2008
@@ -20,8 +20,6 @@
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
-import org.apache.tomcat.util.buf.ByteChunk;
-
/**
* Abstract NIO/APR to avoid some of the complexity and allow more code
* sharing and experiments.
@@ -44,17 +42,19 @@
/**
* This is stored as the attachment in the selector.
*/
- static class SelectorData {
- SelectorData(SelectorCallback selectorCallback) {
- this.callback = selectorCallback;
+ public static class SelectorData {
+ public SelectorData(SelectorThread sel) {
+ this.sel = sel;
}
// APR long is wrapped in a ByteChannel as well - with few other longs.
Channel channelData;
-
- SelectorThread sel;
Object selKey;
- SelectorCallback callback;
+
+ public SelectorThread sel;
+ public SelectorCallback callback;
+
+ SelectorCallback pendingCallback;
// Current interest, used internally to avoid waking up if no change
// Also used for connect and accept.
@@ -72,14 +72,16 @@
// Saved to allow debug messages for bad interest/looping
int lastReadResult;
+ int zeroReads = 0;
int lastWriteResult;
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("SelData: ")
.append(writeInterest ? "W/" : "")
- .append(readInterest ? "R/" : "").append(selKey).append("/")
+ .append(readInterest ? "R/" : "").append("/")
.append(channelData);
+ //append(selKey).
return sb.toString();
}
}
@@ -116,16 +118,16 @@
* @param sc
* @param nextTimer time to call the timeEvent() callback
*/
- public void setTimerEventTime(SelectorCallback sc, long nextTimer) {
- sc.selectorData.nextTimeEvent = nextTimer;
+ public void setTimerEventTime(SelectorData selectorData, long nextTimer) {
+ selectorData.nextTimeEvent = nextTimer;
}
- public int readNonBlocking(SelectorCallback sc, ByteBuffer bb)
+ public int readNonBlocking(SelectorData sc, ByteBuffer bb)
throws IOException {
return 0;
}
- public int writeNonBlocking(SelectorCallback sc, ByteBuffer reqBuf)
+ public int writeNonBlocking(SelectorData sc, ByteBuffer reqBuf)
throws IOException {
return 0;
}
@@ -133,7 +135,7 @@
/**
*
*/
- public int close(SelectorCallback sc) throws IOException {
+ public int close(SelectorData sc) throws IOException {
return 0;
}
@@ -149,6 +151,9 @@
{
}
+ public void ioThreadRun(SelectorData sdata) throws IOException {
+ }
+
/**
* For use with daemon tools, inetd - the server socket will be created
* externally and passed as a file descriptor. This is needed to run on
@@ -160,13 +165,20 @@
/**
* Change the callback associated with the socket.
*/
- public void updateCallback(SelectorCallback old, SelectorCallback sc) {
+ public void updateCallback(SelectorData sdata, SelectorCallback old, SelectorCallback sc) {
}
- public void writeInterest(SelectorCallback sc, boolean writeInterest) {
+ public void writeInterest(SelectorData sc, boolean writeInterest) {
}
- public void readInterest(SelectorCallback sc, boolean readInterest) {
+ public void readInterest(SelectorData sc, boolean readInterest) throws IOException {
+ }
+
+ public int getPort(SelectorData sd, boolean remote) {
+ return 0;
}
+ public InetAddress getAddress(SelectorData sd, boolean remote) {
+ return null;
+ }
}
\ No newline at end of file
Modified: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java?rev=667817&r1=667816&r2=667817&view=diff
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java (original)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java Sat Jun 14 08:36:49 2008
@@ -48,14 +48,18 @@
*/
public class SelectorThreadNio extends SelectorThread implements Runnable {
- static Logger log = Logger.getLogger("SelectorThreadNio");
+ static Logger log = Logger.getLogger("SelNio");
Selector selector;
- ArrayList<SelectorCallback> readInterest = new ArrayList<SelectorCallback>();
- ArrayList<SelectorCallback> writeInterest = new ArrayList<SelectorCallback>();
- ArrayList<SelectorCallback> connectAcceptInterest =
- new ArrayList<SelectorCallback>();
+ ArrayList<SelectorData> readInterest = new ArrayList<SelectorData>();
+ ArrayList<SelectorData> writeInterest = new ArrayList<SelectorData>();
+ ArrayList<SelectorData> connectAcceptInterest =
+ new ArrayList<SelectorData>();
+ ArrayList<SelectorData> updateCallback =
+ new ArrayList<SelectorData>();
+ ArrayList<SelectorData> runInterest =
+ new ArrayList<SelectorData>();
AtomicInteger opened = new AtomicInteger();
AtomicInteger closed = new AtomicInteger();
@@ -63,7 +67,7 @@
// actives are also stored in the Selector. This is only updated in the main
// thread
- ArrayList<SelectorCallback> active = new ArrayList<SelectorCallback>();
+ ArrayList<SelectorData> active = new ArrayList<SelectorData>();
boolean debug = false;
boolean running = true;
@@ -74,7 +78,7 @@
// Normally select will wait for the next time event - if it's
// too far in future, maxSleep will override it.
- private long maxSleep = 1000;
+ private long maxSleep = 10000;
// Never sleep less than minSleep. This defines the resulution for
// time events.
@@ -99,11 +103,6 @@
}
}
- public void setDaemon(boolean d) {
- this.daemon = d;
-
- }
-
public void setName(String n) {
selectorThread.setName(n);
Registry registry = Registry.getRegistry(null, null);
@@ -145,12 +144,17 @@
public void stop() {
running = false;
- log.info("Selector thread stop " + this);
+ if (debug) {
+ log.info("Selector thread stop " + this);
+ }
selector.wakeup();
}
public void run() {
- log.info("Selector thread start " + this);
+ int sloops = 0;
+ if (debug) {
+ log.info("Start NIO thread, daemon=" + daemon);
+ }
while (running) {
// if we want timeouts - set here.
try {
@@ -159,74 +163,135 @@
processPending();
long now = System.currentTimeMillis();
- if (nextWakeup > 0 && nextWakeup < now) {
+ if (nextWakeup < now) {
// We don't want to iterate on every I/O
updateSleepTimeAndProcessTimeouts(now);
}
int selected = selector.select(sleepTime);
lastWakeup = System.currentTimeMillis();
-
+ if (debug && selected == 0) {
+ long delta = lastWakeup - now;
+ if (delta < maxSleep - 1000) { // short wakeup
+ log.info("Wakeup " + selected + " " + (delta));
+ }
+ }
+ if (lastWakeup - now < 10 && selected == 0) {
+ if (sloops > 50) {
+ sloops = 0;
+ log.severe("Looping !");
+ }
+ sloops++;
+ }
// handle events for existing req first.
if (selected != 0) {
+ sloops = 0;
Set<SelectionKey> sel = selector.selectedKeys();
Iterator<SelectionKey> i = sel.iterator();
while (i.hasNext()) {
SelectionKey sk = i.next();
+ i.remove();
// avoid dup - disable the interest
- // TODO: is this really needed ?
- int readyOps = sk.readyOps();
- sk.interestOps(sk.interestOps() & ~readyOps);
+ // TODO: is this really needed ?
+ boolean valid = sk.isValid();
+ int readyOps = (valid) ? sk.readyOps() : 0;
+ if (debug) {
+ log.info("Wakeup " + selected + " " + (lastWakeup - now) +
+ " ready: " + readyOps + " " +
+ sk.isValid() + " " + sk);
+ }
+
+ //sk.interestOps(sk.interestOps() & ~readyOps);
// Find the request receiving the notification
+
SelectorData sdata = (SelectorData) sk.attachment();
+ //synchronized (sdata) {
+ // either that or updateCallback in IO/Thread
SelectorCallback cstate = sdata.callback;
- if (debug) {
- log.info("SelectorThread: selected " + cstate + " " + readyOps);
+
+ //}
+ //checkChannelKey(cstate);
+ if (sdata.selKey != sk || sdata.channelData != sk.channel()) {
+ sdata.selKey = sk;
+ sdata.channelData = sk.channel();
}
if (sk.isValid() && sk.isAcceptable()) {
handleAccept(cstate, sk);
+ if (debug) {
+ log.info("Wakeup done, accept" + selected
+ + " " + (lastWakeup - now)
+ + " ready: " + readyOps + " "
+ + sk.readyOps() + " " + sk);
+ }
continue;
}
SocketChannel sc = (SocketChannel) sk.channel();
if (!sk.isValid()) {
if (debug) {
- log.info("SelectorThread: !isValid, closed socket " + cstate);
+ log.info("!isValid, closed socket " + cstate);
}
- close(sk, cstate, sc, null);
+ close(sdata, sk, cstate, sc, null, true);
continue;
}
try {
// callbacks
if (sk.isValid() && sk.isConnectable()) {
- handleConnect(cstate, sc);
+ // Only needed once
+ sk.interestOps(sk.interestOps()
+ & ~SelectionKey.OP_CONNECT);
+ handleConnect(sdata, cstate, sc);
+ if (debug) {
+ log.info("Wakeup done, connect" + selected
+ + " " + (lastWakeup - now)
+ + " ready: " + readyOps + " "
+ + sk.readyOps() + " " + sk);
+ }
}
if (sk.isValid() && sk.isWritable()) {
- cstate.selectorData.lastWriteResult = 0;
- cstate.dataWriteable(this);
- if (cstate.selectorData.lastWriteResult > 0 &&
- cstate.selectorData.writeInterest) {
+ // Needs to be explicitely re-enabled
+ sk.interestOps(sk.interestOps()
+ & ~SelectionKey.OP_WRITE);
+ sdata.lastWriteResult = 0;
+ if (debug) {
+ log.info("dataWritable " + selected
+ + " " + (lastWakeup - now)
+ + " ready: " + readyOps + " "
+ + sk.readyOps() + " " + cstate +
+ " " + sk);
+ }
+ cstate.dataWriteable(sdata);
+
+ if (sdata.lastWriteResult > 0 &&
+ sdata.writeInterest) {
log.warning("SelectorThread: write interest" +
" after incomplete write");
}
}
if (sk.isReadable()) {
- cstate.selectorData.lastReadResult = 0;
- cstate.dataReceived(this);
- if (cstate.selectorData.lastReadResult > 0 &&
- cstate.selectorData.readInterest) {
- log.warning("SelectorThread: read interest" +
- " after incomplete read");
+ sdata.lastReadResult = 0;
+ if (debug) {
+ log.info("dataReceived " + selected
+ + " " + (lastWakeup - now)
+ + " ready: " + readyOps + " "
+ + sk.readyOps() + " " + cstate +
+ " " + sk);
}
+ cstate.dataReceived(sdata);
+// if (cstate.selectorData.lastReadResult > 0 &&
+// cstate.selectorData.readInterest) {
+// log.warning("SelectorThread: read interest" +
+// " after incomplete read");
+// }
}
} catch (Throwable t) {
t.printStackTrace();
- close(sk, cstate, sc, t);
+ close(sdata, sk, cstate, sc, t, true);
}
}
@@ -240,12 +305,18 @@
} // while(running)
}
- private void handleConnect(SelectorCallback cstate, SocketChannel sc)
+ private void handleConnect(SelectorData sdata, SelectorCallback cstate, SocketChannel sc)
throws IOException, SocketException {
- sc.finishConnect();
+ if (!sc.finishConnect()) {
+ log.warning("handleConnected - finishConnect returns false");
+ }
+ sdata.sel = this;
sc.socket().setSoLinger(true, 0);
- cstate.connected(this);
- readInterest(cstate, true);
+ if (debug) {
+ log.info("connected() " + cstate);
+ }
+ cstate.connected(sdata);
+ readInterest(sdata, true);
}
private void handleAccept(SelectorCallback cstate, SelectionKey sk)
@@ -253,103 +324,94 @@
SelectableChannel selc = sk.channel();
ServerSocketChannel ssc=(ServerSocketChannel)selc;
SocketChannel sockC = ssc.accept();
-// if (sockC == null) {
-// continue;
-// }
sockC.configureBlocking(false);
- SelectorCallback acb = cstate.connectionAccepted(this, sockC);
- acb.selectorData.selKey = sockC.register(selector,
- SelectionKey.OP_READ,
- acb.selectorData);
- acb.selectorData.channelData = sockC;
- acb.selectorData.sel = this;
- active.add(acb);
+ SelectorData selectorData = new SelectorData(this);
+ selectorData.selKey = sockC.register(selector,
+ SelectionKey.OP_READ,
+ selectorData);
+ selectorData.channelData = sockC;
- sk.interestOps(sk.interestOps() | SelectionKey.OP_ACCEPT);
- }
-
- public void updateCallback(SelectorCallback old,
- SelectorCallback cstate) {
- cstate.selectorData = old.selectorData;
- cstate.selectorData.callback = cstate;
- // leave old.selectorData around in case some thread is still using it
- }
-
- public void writeInterest(SelectorCallback cstate, boolean b) {
- if (b && (cstate.selectorData.interest | SelectionKey.OP_WRITE) != 0) {
+ // Find the callback for the new socket
+ SelectorCallback acb =
+ cstate.connectionAccepted(selectorData, sockC);
+ selectorData.callback = acb;
+ if (acb == null) {
+ log.severe("No callback for socket " + selectorData);
+ close(selectorData);
return;
}
- if (!b && (cstate.selectorData.interest | SelectionKey.OP_WRITE) == 0) {
- return;
+
+ synchronized (active) {
+ active.add(selectorData);
}
- if (Thread.currentThread() == selectorThread) {
- cstate.selectorData.writeInterest = b;
- if (cstate.selectorData.writeInterest) {
- cstate.selectorData.interest =
- cstate.selectorData.interest | SelectionKey.OP_WRITE;
- } else {
- cstate.selectorData.interest =
- cstate.selectorData.interest & ~SelectionKey.OP_WRITE;
- }
- SelectionKey sk = (SelectionKey) cstate.selectorData.selKey;
- sk.interestOps(cstate.selectorData.interest);
- return;
+ //sk.interestOps(sk.interestOps() | SelectionKey.OP_ACCEPT);
+ if (debug) {
+ log.info("handleAccept " + cstate);
}
- if (!b) {
- return; // can't remove interest from regular thread
- }
- synchronized (writeInterest) {
- writeInterest.add(cstate);
+ }
+
+ /**
+ * Tricky: we have a socket ( current.selectorData ), we want
+ * it to be associated with a different selectorData.callback, but
+ * what if the IO thread is just making a callback on that
+ * selectorData ?
+ *
+ * We could do it in the IO/thread, or make sure it is atomic.
+ *
+ */
+ @Override
+ public void updateCallback(SelectorData sdata,
+ SelectorCallback current,
+ SelectorCallback cstate) {
+ sdata.pendingCallback = cstate;
+ if (isSelectorThread()) {
+ updateCallbackIOT(sdata);
+ } else {
+ synchronized (updateCallback) {
+ updateCallback.add(sdata);
+ }
}
- selector.wakeup();
}
- public void readInterest(SelectorCallback cstate, boolean b) {
- if (Thread.currentThread() == selectorThread) {
- cstate.selectorData.readInterest = b;
- selThreadUpdateInterest(cstate);
- log.info("Registering read interest");
- return;
- }
- if (b && (cstate.selectorData.interest | SelectionKey.OP_READ) != 0) {
- return;
+ private void processPendingUpdateCallback() {
+ if (updateCallback.size() > 0) {
+ synchronized (updateCallback) {
+ Iterator<SelectorData> ci = updateCallback.iterator();
+ while (ci.hasNext()) {
+ SelectorData cstate = ci.next();
+ updateCallbackIOT(cstate);
+ }
+ updateCallback.clear();
+ }
}
- if (!b && (cstate.selectorData.interest | SelectionKey.OP_READ) == 0) {
- return;
+ }
+
+ private void updateCallbackIOT(SelectorData sdata) {
+ if (debug) {
+ log.info("Callback update " + sdata.pendingCallback + " old=" + sdata.callback);
}
- // Schedule the interest update.
- synchronized (readInterest) {
- readInterest.add(cstate);
+ synchronized (sdata) {
+ sdata.callback = sdata.pendingCallback;
+ sdata.pendingCallback = null;
}
- log.info("Registering pending read interest");
- selector.wakeup();
}
- private void selThreadUpdateInterest(SelectorCallback cstate) {
- SelectionKey sk = (SelectionKey) cstate.selectorData.selKey;
- if (sk != null && sk.isValid()) {
- if (debug) {
- log.info("SelectorThread: process pending: interest " +
- cstate.selectorData.interest + " for " + cstate);
- }
- if (cstate.selectorData.readInterest) {
- cstate.selectorData.interest =
- cstate.selectorData.interest | SelectionKey.OP_READ;
- } else {
- cstate.selectorData.interest =
- cstate.selectorData.interest & ~SelectionKey.OP_READ;
- }
- sk.interestOps(cstate.selectorData.interest);
- }
- }
-
- private void close(SelectionKey sk,
+ private void close(SelectorData sdata,
+ SelectionKey sk,
SelectorCallback cstate,
Channel ch,
- Throwable ex) {
+ Throwable ex, boolean remove) {
try {
- sk.cancel();
+ if (debug) {
+ log.info("-------------> close: " + cstate + " t=" + ex);
+ }
+ if (sk != null && sk.isValid()) {
+ sk.interestOps(0);
+ }
+ if (sk != null) {
+ sk.cancel();
+ }
if (ch instanceof SocketChannel) {
SocketChannel sc = (SocketChannel) ch;
if (sc.isConnected()) {
@@ -368,8 +430,12 @@
}
ch.close();
closed.incrementAndGet();
- cstate.channelClosed(this, ex);
- active.remove(cstate);
+ cstate.channelClosed(sdata, ex);
+ if (remove) {
+ synchronized (active) {
+ active.remove(cstate);
+ }
+ }
} catch (IOException ex2) {
log.severe("SelectorThread: Error closing socket " + ex2);
ex2.printStackTrace();
@@ -379,39 +445,117 @@
// --------------- Socket op abstractions ------------
@Override
- public int readNonBlocking(SelectorCallback cstate, ByteBuffer bb)
+ public int readNonBlocking(SelectorData selectorData, ByteBuffer bb)
throws IOException {
- int done = ((SocketChannel) cstate.selectorData.channelData).read(bb);
- cstate.selectorData.lastReadResult = done;
- if (done < 0) {
+ try {
+ int off = bb.position();
+ int done = ((SocketChannel) selectorData.channelData).read(bb);
if (debug) {
- log.info("SelectorThread: EOF while reading");
+ log.info("-------------readNB rd=" + done + " bb.limit=" +
+ bb.limit() + " pos=" + bb.position() + " " + selectorData.callback);
}
+ if (done > 0) {
+ if (debug) {
+ String s = new String(bb.array(), off,
+ bb.position() - off);
+ log.info("Data:\n" + s);
+ }
+ if (done + off != bb.position()) {
+ System.err.println("XXX");
+ }
+ selectorData.zeroReads = 0;
+ } else if (done < 0) {
+ if (debug) {
+ log.info("SelectorThread: EOF while reading");
+ }
+ } else {
+ // need more...
+ if (selectorData.lastReadResult == 0) {
+ selectorData.zeroReads++;
+ if (selectorData.zeroReads > 6) {
+ log.severe("Double 0 reading ");
+ close(selectorData);
+ return -1;
+ }
+ }
+ }
+ selectorData.lastReadResult = done;
+ return done;
+ } catch(IOException ex) {
+ if (debug) {
+ log.info("readNB error rd=" + -1 + " bblen=" +
+ (bb.limit() - bb.position()) + " " + selectorData.callback + " " + ex);
+ }
+ close(selectorData);
+ return -1;
}
- return done;
}
@Override
- public int writeNonBlocking(SelectorCallback cstate, ByteBuffer bb)
- throws IOException {
- int done = ((SocketChannel) cstate.selectorData.channelData).write(bb);
- cstate.selectorData.lastWriteResult = done;
- return done;
+ public int writeNonBlocking(SelectorData selectorData, ByteBuffer bb)
+ throws IOException {
+ try {
+ if (debug) {
+ log.info("writeNB pos=" + bb.position() + " len=" +
+ (bb.limit() - bb.position()) + " " + selectorData.callback);
+ }
+ if (debug) {
+ String s = new String(bb.array(), bb.position(),
+ bb.limit() - bb.position());
+ log.info("Data:\n" + s);
+ }
+ int done = ((SocketChannel) selectorData.channelData).write(bb);
+ selectorData.lastWriteResult = done;
+ return done;
+ } catch(IOException ex) {
+ if (debug) {
+ log.info("writeNB error pos=" + bb.position() + " len=" +
+ (bb.limit() - bb.position()) + " " + selectorData.callback + " " +
+ ex);
+ }
+ close(selectorData);
+ return -1;
+ }
+ }
+
+ public int getPort(SelectorData sd, boolean remote) {
+ SocketChannel socketChannel = (SocketChannel) sd.channelData;
+
+ if (remote) {
+ return socketChannel.socket().getPort();
+ } else {
+ return socketChannel.socket().getLocalPort();
+ }
+ }
+
+ public InetAddress getAddress(SelectorData sd, boolean remote) {
+ SocketChannel socketChannel = (SocketChannel) sd.channelData;
+
+ if (remote) {
+ return socketChannel.socket().getInetAddress();
+ } else {
+ return socketChannel.socket().getLocalAddress();
+ }
}
/**
*/
@Override
public void connect(String host, int port, SelectorCallback cstate)
- throws IOException {
+ throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
- int o = opened.incrementAndGet();
+ SelectorData selectorData = new SelectorData(this);
+ selectorData.sel = this;
+ selectorData.callback = cstate;
+ selectorData.channelData = socketChannel;
+ selectorData.channelData = socketChannel; // no key
+
socketChannel.connect(new InetSocketAddress(host, port));
- cstate.selectorData.channelData = socketChannel;
- cstate.selectorData.interest = SelectionKey.OP_CONNECT;
+ opened.incrementAndGet();
+
synchronized (connectAcceptInterest) {
- connectAcceptInterest.add(cstate);
+ connectAcceptInterest.add(selectorData);
}
selector.wakeup();
}
@@ -424,14 +568,14 @@
}
// TODO
- public void setSocketOptions(SelectorCallback cstate,
+ public void setSocketOptions(SelectorData selectorData,
int linger,
boolean tcpNoDelay,
int socketTimeout)
throws IOException {
SocketChannel socketChannel =
- (SocketChannel) cstate.selectorData.channelData;
+ (SocketChannel) selectorData.channelData;
Socket socket = socketChannel.socket();
if(linger >= 0 )
@@ -443,14 +587,16 @@
}
@Override
- public int close(SelectorCallback cstate) throws IOException {
- close((SelectionKey) cstate.selectorData.selKey, cstate,
- cstate.selectorData.channelData, null);
+ public int close(SelectorData selectorData) throws IOException {
+ close(selectorData,
+ (SelectionKey) selectorData.selKey, selectorData.callback,
+ selectorData.channelData, null, true);
return 0;
}
+ @Override
public void acceptor(SelectorCallback cstate,
int port,
InetAddress inet,
@@ -475,28 +621,43 @@
ssc.configureBlocking(false);
- cstate.selectorData.channelData = ssc;
- cstate.selectorData.interest = SelectionKey.OP_ACCEPT;
- // cstate must return 'OP_ACCEPT' as interest
+ SelectorData selectorData = new SelectorData(this);
+ selectorData.channelData = ssc; // no key yet
+ selectorData.callback = cstate;
+ // key will be set in pending
+
+
synchronized (connectAcceptInterest) {
- connectAcceptInterest.add(cstate);
+ connectAcceptInterest.add(selectorData);
}
selector.wakeup();
}
+
+ public void ioThreadRun(SelectorData sdata) throws IOException {
+ if (isSelectorThread()) {
+ sdata.callback.ioThreadRun(sdata);
+ } else {
+ synchronized (runInterest) {
+ runInterest.add(sdata);
+ }
+ selector.wakeup();
+ }
+ }
+ @Override
public void inetdAcceptor(SelectorCallback cstate) throws IOException {
SelectorProvider sp=SelectorProvider.provider();
Channel ch=sp.inheritedChannel();
if(ch!=null ) {
- //("Inherited: " + ch.getClass().getName());
+ log.info("Inherited: " + ch.getClass().getName());
// blocking mode
ServerSocketChannel ssc=(ServerSocketChannel)ch;
- cstate.selectorData.channelData = ssc;
- cstate.selectorData.interest = SelectionKey.OP_ACCEPT;
- // cstate must return 'OP_ACCEPT' as interest
+ SelectorData selectorData = new SelectorData(this);
+ selectorData.channelData = ssc;
+
synchronized (connectAcceptInterest) {
- connectAcceptInterest.add(cstate);
+ connectAcceptInterest.add(selectorData);
}
selector.wakeup();
}
@@ -508,31 +669,45 @@
* smallest timeout
* @throws IOException
*/
- void updateSleepTimeAndProcessTimeouts(long now) throws IOException {
+ synchronized void updateSleepTimeAndProcessTimeouts(long now)
+ throws IOException {
long min = Long.MAX_VALUE;
// TODO: test with large sets, maybe sort
- Iterator<SelectorCallback> activeIt = active.iterator();
- while(activeIt.hasNext()) {
- SelectorCallback cstate = activeIt.next();
- if (! cstate.selectorData.channelData.isOpen()) {
- log.info("Closed socket " + cstate.selectorData.channelData);
- activeIt.remove();
- close(cstate); // generate callback, increment counters.
- }
-
- long t = cstate.selectorData.nextTimeEvent;
- if (t == 0) {
- continue;
- }
- if (t < now) {
- // Timeout
- cstate.timeEvent(this);
- // TODO: make sure this is updated if it was selected
- continue;
- }
- if (t < min) {
- min = t;
- }
+ synchronized (active) {
+ Iterator<SelectorData> activeIt = active.iterator();
+
+ while(activeIt.hasNext()) {
+ SelectorData selectorData = activeIt.next();
+ if (! selectorData.channelData.isOpen()) {
+ if (debug) {
+ log.info("Found closed socket, removing " +
+ selectorData.channelData);
+ }
+ activeIt.remove();
+ close(selectorData,
+ (SelectionKey) selectorData.selKey, selectorData.callback,
+ selectorData.channelData, null, false); // generate callback, increment counters.
+ }
+
+ long t = selectorData.nextTimeEvent;
+ if (t == 0) {
+ continue;
+ }
+ if (t < now) {
+ // Timeout
+ if (debug) {
+ log.info("Time event " + selectorData.callback);
+ }
+ if (selectorData.callback != null) {
+ selectorData.callback.timeEvent(selectorData);
+ }
+ // TODO: make sure this is updated if it was selected
+ continue;
+ }
+ if (t < min) {
+ min = t;
+ }
+ }
}
long nextSleep = min - now;
if (nextSleep > maxSleep) {
@@ -542,38 +717,179 @@
} else {
sleepTime = nextSleep;
}
+ if (sleepTime == 0) {
+ System.err.println("XXX");
+ }
nextWakeup = now + sleepTime;
}
- private void processPending() throws IOException {
+ @Override
+ public void writeInterest(SelectorData selectorData, boolean b) {
+ SelectionKey sk = (SelectionKey) selectorData.selKey;
+ if (!sk.isValid()) {
+ return;
+ }
+ int interest = sk.interestOps();
+ if (b && (interest & SelectionKey.OP_WRITE) != 0) {
+ return;
+ }
+ if (!b && (interest & SelectionKey.OP_WRITE) == 0) {
+ return;
+ }
+ if (Thread.currentThread() == selectorThread) {
+ selectorData.writeInterest = b;
+ if (selectorData.writeInterest) {
+ interest =
+ interest | SelectionKey.OP_WRITE;
+ } else {
+ interest =
+ interest & ~SelectionKey.OP_WRITE;
+ }
+ if (interest == 0) {
+ log.warning("No interest " + selectorData.callback);
+ } else {
+ sk.interestOps(interest);
+ }
+ if (debug) {
+ log.info("Write interest " + selectorData.callback + " i=" + interest);
+ }
+ return;
+ }
+ if (!b) {
+ return; // can't remove interest from regular thread
+ }
+ selectorData.writeInterest = b;
+ if (debug) {
+ log.info("Pending write interest " + selectorData.callback);
+ }
+ synchronized (writeInterest) {
+ writeInterest.add(selectorData);
+ }
+ selector.wakeup();
+ }
+
+
+ @Override
+ public void readInterest(SelectorData selectorData, boolean b) throws IOException {
+ if (Thread.currentThread() == selectorThread) {
+ selectorData.readInterest = b;
+ selThreadReadInterest(selectorData);
+ return;
+ }
+ SelectionKey sk = (SelectionKey) selectorData.selKey;
+ int interest = sk.interestOps();
+ selectorData.readInterest = b;
+ if (b && (interest & SelectionKey.OP_READ) != 0) {
+ return;
+ }
+ if (!b && (interest & SelectionKey.OP_READ) == 0) {
+ return;
+ }
+ // Schedule the interest update.
+ synchronized (readInterest) {
+ readInterest.add(selectorData);
+ }
+ if (debug) {
+ log.info("Registering pending read interest");
+ }
+ selector.wakeup();
+ }
+
+
+ private void selThreadReadInterest(SelectorData selectorData) throws IOException {
+ SelectionKey sk = (SelectionKey) selectorData.selKey;
+ if (sk == null) {
+ if (selectorData.readInterest) {
+ if (debug) {
+ log.info("Register again for read interest");
+ }
+ SocketChannel socketChannel =
+ (SocketChannel) selectorData.channelData;
+ if (socketChannel.isOpen()) {
+ selectorData.sel = this;
+ selectorData.selKey =
+ socketChannel.register(selector,
+ SelectionKey.OP_READ, selectorData);
+ selectorData.channelData = socketChannel;
+ }
+ }
+ return;
+ }
+ if (!sk.isValid()) {
+ return;
+ }
+ int interest = sk.interestOps();
+ if (sk != null && sk.isValid()) {
+ if (selectorData.readInterest) {
+// if ((interest | SelectionKey.OP_READ) != 0) {
+// return;
+// }
+ interest =
+ interest | SelectionKey.OP_READ;
+ } else {
+// if ((interest | SelectionKey.OP_READ) == 0) {
+// return;
+// }
+ interest =
+ interest & ~SelectionKey.OP_READ;
+ }
+ if (interest == 0) {
+ log.warning("No interest(rd removed) " + selectorData.callback);
+ // TODO: should we remove it ? It needs to be re-activated
+ // later.
+ sk.cancel(); //??
+ selectorData.selKey = null;
+ } else {
+ sk.interestOps(interest);
+ }
+ if (debug) {
+ log.info(((selectorData.readInterest)
+ ? "RESUME read " : "SUSPEND read ")
+ + selectorData.callback);
+ }
+ }
+ }
+
+
+ private void processPendingConnectAccept() throws IOException {
synchronized (connectAcceptInterest) {
- Iterator<SelectorCallback> ci = connectAcceptInterest.iterator();
+ Iterator<SelectorData> ci = connectAcceptInterest.iterator();
while (ci.hasNext()) {
- SelectorCallback cstate = ci.next();
- SelectionKey sk = (SelectionKey) cstate.selectorData.selKey;
+ SelectorData selectorData = ci.next();
+ SelectionKey sk = (SelectionKey) selectorData.selKey;
// Find host, port - initiate connection
try {
// Accept interest ?
- if (cstate.selectorData.channelData instanceof ServerSocketChannel) {
+ if (selectorData.channelData instanceof ServerSocketChannel) {
ServerSocketChannel socketChannel =
- (ServerSocketChannel) cstate.selectorData.channelData;
- cstate.selectorData.sel = this;
- cstate.selectorData.selKey =
+ (ServerSocketChannel) selectorData.channelData;
+ selectorData.sel = this;
+ selectorData.selKey =
socketChannel.register(selector,
- SelectionKey.OP_ACCEPT, cstate.selectorData);
+ SelectionKey.OP_ACCEPT, selectorData);
- cstate.selectorData.channelData = socketChannel;
- active.add(cstate);
+ selectorData.channelData = socketChannel;
+ synchronized (active) {
+ active.add(selectorData);
+ }
+ if (debug) {
+ log.info("Pending acceptor added: " + selectorData.callback);
+ }
} else {
SocketChannel socketChannel =
- (SocketChannel) cstate.selectorData.channelData;
- cstate.selectorData.sel = this;
- cstate.selectorData.selKey =
+ (SocketChannel) selectorData.channelData;
+ selectorData.sel = this;
+ selectorData.selKey =
socketChannel.register(selector,
- SelectionKey.OP_CONNECT, cstate.selectorData);
- active.add(cstate);
+ SelectionKey.OP_CONNECT, selectorData);
+ synchronized (active) {
+ active.add(selectorData);
+ }
+ if (debug) {
+ log.info("Pending connect added: " + selectorData.callback);
+ }
}
} catch (IOException e) {
e.printStackTrace();
@@ -581,27 +897,59 @@
}
connectAcceptInterest.clear();
}
+ }
+
+ private void processPending() throws IOException {
+ processPendingConnectAccept();
+ processPendingReadWrite();
+ if (runInterest.size() > 0) {
+ synchronized (runInterest) {
+ Iterator<SelectorData> ci = runInterest.iterator();
+ while (ci.hasNext()) {
+ SelectorData cstate = ci.next();
+ try {
+ cstate.callback.ioThreadRun(cstate);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ if (debug) {
+ log.info("Run in selthread: " + cstate.callback);
+ }
+ }
+ runInterest.clear();
+ }
+ }
+ processPendingUpdateCallback();
+ }
+
+ private void processPendingReadWrite() throws IOException {
// Update interest
if (readInterest.size() > 0) {
synchronized (readInterest) {
- Iterator<SelectorCallback> ci = readInterest.iterator();
+ Iterator<SelectorData> ci = readInterest.iterator();
while (ci.hasNext()) {
- SelectorCallback cstate = ci.next();
- selThreadUpdateInterest(cstate);
+ SelectorData cstate = ci.next();
+ selThreadReadInterest(cstate);
+ if (debug) {
+ log.info("Read interest added: " + cstate);
+ }
}
readInterest.clear();
}
}
if (writeInterest.size() > 0) {
synchronized (writeInterest) {
- Iterator<SelectorCallback> ci = writeInterest.iterator();
+ Iterator<SelectorData> ci = writeInterest.iterator();
while (ci.hasNext()) {
- SelectorCallback cstate = ci.next();
+ SelectorData cstate = ci.next();
// Fake callback - will update as side effect
- cstate.dataWriteable(this);
+ cstate.callback.dataWriteable(cstate);
+ if (debug) {
+ log.info("Write interest, calling dataWritable: " + cstate);
+ }
}
- readInterest.clear();
+ writeInterest.clear();
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org