You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2012/11/24 18:35:47 UTC
svn commit: r1413212 - in
/tomcat/trunk/java/org/apache/coyote/http11/upgrade:
UpgradeAprProcessor.java UpgradeNioProcessor.java
Author: markt
Date: Sat Nov 24 17:35:46 2012
New Revision: 1413212
URL: http://svn.apache.org/viewvc?rev=1413212&view=rev
Log:
First cut HTTP upgrade for NIO/APR
- Non-blocking not supported
- WebSocket broken
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java
Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java?rev=1413212&r1=1413211&r2=1413212&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java Sat Nov 24 17:35:46 2012
@@ -26,76 +26,86 @@ import org.apache.tomcat.util.net.Socket
public class UpgradeAprProcessor extends UpgradeProcessor<Long> {
- private final long socket;
-
+ private static final int INFINITE_TIMEOUT = -1;
public UpgradeAprProcessor(SocketWrapper<Long> wrapper,
ProtocolHandler httpUpgradeProcessor) {
- super(upgradeInbound);
-
- Socket.timeoutSet(wrapper.getSocket().longValue(),
- upgradeInbound.getReadTimeout());
+ super(httpUpgradeProcessor,
+ new AprUpgradeServletInputStream(wrapper.getSocket().longValue()),
+ new AprUpgradeServletOutputStream(wrapper.getSocket().longValue()));
- this.socket = wrapper.getSocket().longValue();
+ Socket.timeoutSet(wrapper.getSocket().longValue(), INFINITE_TIMEOUT);
}
- /*
- * Output methods
- */
- @Override
- public void flush() throws IOException {
- // NOOP
- }
+ // ----------------------------------------------------------- Inner classes
+ private static class AprUpgradeServletInputStream
+ extends UpgradeServletInputStream {
- @Override
- public void write(int b) throws IOException {
- Socket.send(socket, new byte[] {(byte) b}, 0, 1);
- }
+ private final long socket;
+ public AprUpgradeServletInputStream(long socket) {
+ this.socket = socket;
+ }
- @Override
- public void write(byte[]b, int off, int len) throws IOException {
- Socket.send(socket, b, off, len);
+ @Override
+ protected int doRead() throws IOException {
+ byte[] bytes = new byte[1];
+ int result = Socket.recv(socket, bytes, 0, 1);
+ if (result == -1) {
+ return -1;
+ } else {
+ return bytes[0] & 0xFF;
+ }
+ }
+
+ @Override
+ protected int doRead(byte[] b, int off, int len) throws IOException {
+ boolean block = true;
+ if (!block) {
+ Socket.optSet(socket, Socket.APR_SO_NONBLOCK, -1);
+ }
+ try {
+ int result = Socket.recv(socket, b, off, len);
+ if (result > 0) {
+ return result;
+ } else if (-result == Status.EAGAIN) {
+ return 0;
+ } else {
+ throw new IOException(sm.getString("apr.error",
+ Integer.valueOf(-result)));
+ }
+ } finally {
+ if (!block) {
+ Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 0);
+ }
+ }
+ }
}
+ private static class AprUpgradeServletOutputStream
+ extends UpgradeServletOutputStream {
+
+ private final long socket;
- /*
- * Input methods
- */
- @Override
- public int read() throws IOException {
- byte[] bytes = new byte[1];
- int result = Socket.recv(socket, bytes, 0, 1);
- if (result == -1) {
- return -1;
- } else {
- return bytes[0] & 0xFF;
+ public AprUpgradeServletOutputStream(long socket) {
+ this.socket = socket;
}
- }
+ @Override
+ protected void doWrite(int b) throws IOException {
+ Socket.send(socket, new byte[] {(byte) b}, 0, 1);
+ }
- @Override
- public int read(boolean block, byte[] bytes, int off, int len)
- throws IOException {
- if (!block) {
- Socket.optSet(socket, Socket.APR_SO_NONBLOCK, -1);
- }
- try {
- int result = Socket.recv(socket, bytes, off, len);
- if (result > 0) {
- return result;
- } else if (-result == Status.EAGAIN) {
- return 0;
- } else {
- throw new IOException(sm.getString("apr.error",
- Integer.valueOf(-result)));
- }
- } finally {
- if (!block) {
- Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 0);
- }
+ @Override
+ protected void doWrite(byte[] b, int off, int len) throws IOException {
+ Socket.send(socket, b, off, len);
+ }
+
+ @Override
+ protected void doFlush() throws IOException {
+ // NO-OP
}
}
}
Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java?rev=1413212&r1=1413211&r2=1413212&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java Sat Nov 24 17:35:46 2012
@@ -30,204 +30,219 @@ import org.apache.tomcat.util.net.Socket
public class UpgradeNioProcessor extends UpgradeProcessor<NioChannel> {
- private final NioChannel nioChannel;
- private final NioSelectorPool pool;
- private final int maxRead;
- private final int maxWrite;
+ private static final int INFINITE_TIMEOUT = -1;
public UpgradeNioProcessor(SocketWrapper<NioChannel> wrapper,
ProtocolHandler httpUpgradeProcessor, NioSelectorPool pool) {
- super(upgradeInbound);
+ super(httpUpgradeProcessor,
+ new NioUpgradeServletInputStream(wrapper, pool),
+ new NioUpgradeServletOutputStream(wrapper, pool));
- wrapper.setTimeout(upgradeInbound.getReadTimeout());
-
- this.nioChannel = wrapper.getSocket();
- this.pool = pool;
- this.maxRead = nioChannel.getBufHandler().getReadBuffer().capacity();
- this.maxWrite = nioChannel.getBufHandler().getWriteBuffer().capacity();
+ wrapper.setTimeout(INFINITE_TIMEOUT);
}
- /*
- * Output methods
- */
- @Override
- public void flush() throws IOException {
- NioEndpoint.KeyAttachment att =
- (NioEndpoint.KeyAttachment) nioChannel.getAttachment(false);
- if (att == null) {
- throw new IOException("Key must be cancelled");
- }
- long writeTimeout = att.getTimeout();
- Selector selector = null;
- try {
- selector = pool.get();
- } catch ( IOException x ) {
- //ignore
- }
- try {
- do {
- if (nioChannel.flush(true, selector, writeTimeout)) {
- break;
- }
- } while (true);
- } finally {
- if (selector != null) {
- pool.put(selector);
+ // ----------------------------------------------------------- Inner classes
+
+ private static class NioUpgradeServletInputStream
+ extends UpgradeServletInputStream {
+
+ private final NioChannel nioChannel;
+ private final NioSelectorPool pool;
+ private final int maxRead;
+
+ public NioUpgradeServletInputStream(SocketWrapper<NioChannel> wrapper,
+ NioSelectorPool pool) {
+ nioChannel = wrapper.getSocket();
+ this.pool = pool;
+ maxRead = nioChannel.getBufHandler().getReadBuffer().capacity();
+ }
+
+ @Override
+ protected int doRead() throws IOException {
+ byte[] bytes = new byte[1];
+ int result = readSocket(true, bytes, 0, 1);
+ if (result == -1) {
+ return -1;
+ } else {
+ return bytes[0] & 0xFF;
}
}
- }
- @Override
- public void write(int b) throws IOException {
- writeToSocket(new byte[] {(byte) b}, 0, 1);
- }
+ @Override
+ protected int doRead(byte[] b, int off, int len) throws IOException {
+ if (len > maxRead) {
+ return readSocket(true, b, off, maxRead);
+ } else {
+ return readSocket(true, b, off, len);
+ }
+ }
+
+ private int readSocket(boolean block, byte[] b, int off, int len)
+ throws IOException {
+
+ ByteBuffer readBuffer = nioChannel.getBufHandler().getReadBuffer();
+ int remaining = readBuffer.remaining();
+
+ // Is there enough data in the read buffer to satisfy this request?
+ if (remaining >= len) {
+ readBuffer.get(b, off, len);
+ return len;
+ }
+
+ // Copy what data there is in the read buffer to the byte array
+ int leftToWrite = len;
+ int newOffset = off;
+ if (remaining > 0) {
+ readBuffer.get(b, off, remaining);
+ leftToWrite -= remaining;
+ newOffset += remaining;
+ }
+
+ // Fill the read buffer as best we can
+ readBuffer.clear();
+ int nRead = fillReadBuffer(block);
+
+ // Full as much of the remaining byte array as possible with the data
+ // that was just read
+ if (nRead > 0) {
+ readBuffer.flip();
+ readBuffer.limit(nRead);
+ if (nRead > leftToWrite) {
+ readBuffer.get(b, newOffset, leftToWrite);
+ leftToWrite = 0;
+ } else {
+ readBuffer.get(b, newOffset, nRead);
+ leftToWrite -= nRead;
+ }
+ } else if (nRead == 0) {
+ readBuffer.flip();
+ readBuffer.limit(nRead);
+ } else if (nRead == -1) {
+ throw new EOFException(sm.getString("nio.eof.error"));
+ }
- @Override
- public void write(byte[]b, int off, int len) throws IOException {
- int written = 0;
- while (len - written > maxWrite) {
- written += writeToSocket(b, off + written, maxWrite);
+ return len - leftToWrite;
}
- writeToSocket(b, off + written, len - written);
- }
- /*
- * Input methods
- */
- @Override
- public int read() throws IOException {
- byte[] bytes = new byte[1];
- int result = readSocket(true, bytes, 0, 1);
- if (result == -1) {
- return -1;
- } else {
- return bytes[0] & 0xFF;
+ private int fillReadBuffer(boolean block) throws IOException {
+ int nRead;
+ if (block) {
+ Selector selector = null;
+ try {
+ selector = pool.get();
+ } catch ( IOException x ) {
+ // Ignore
+ }
+ try {
+ NioEndpoint.KeyAttachment att =
+ (NioEndpoint.KeyAttachment) nioChannel.getAttachment(false);
+ if (att == null) {
+ throw new IOException("Key must be cancelled.");
+ }
+ nRead = pool.read(nioChannel.getBufHandler().getReadBuffer(),
+ nioChannel, selector, att.getTimeout());
+ } catch (EOFException eof) {
+ nRead = -1;
+ } finally {
+ if (selector != null) {
+ pool.put(selector);
+ }
+ }
+ } else {
+ nRead = nioChannel.read(nioChannel.getBufHandler().getReadBuffer());
+ }
+ return nRead;
}
}
- @Override
- public int read(boolean block, byte[] bytes, int off, int len)
- throws IOException {
- if (len > maxRead) {
- return readSocket(block, bytes, off, maxRead);
- } else {
- return readSocket(block, bytes, off, len);
+ private static class NioUpgradeServletOutputStream
+ extends UpgradeServletOutputStream {
+
+ private final NioChannel nioChannel;
+ private final NioSelectorPool pool;
+ private final int maxWrite;
+
+ public NioUpgradeServletOutputStream(
+ SocketWrapper<NioChannel> wrapper, NioSelectorPool pool) {
+ nioChannel = wrapper.getSocket();
+ this.pool = pool;
+ maxWrite = nioChannel.getBufHandler().getWriteBuffer().capacity();
}
- }
+ @Override
+ protected void doWrite(int b) throws IOException {
+ writeToSocket(new byte[] {(byte) b}, 0, 1);
+ }
- /*
- * Adapted from the NioInputBuffer.
- */
- private int readSocket(boolean block, byte[] bytes, int offset, int len)
- throws IOException {
-
- ByteBuffer readBuffer = nioChannel.getBufHandler().getReadBuffer();
- int remaining = readBuffer.remaining();
-
- // Is there enough data in the read buffer to satisfy this request?
- if (remaining >= len) {
- readBuffer.get(bytes, offset, len);
- return len;
- }
-
- // Copy what data there is in the read buffer to the byte array
- int leftToWrite = len;
- int newOffset = offset;
- if (remaining > 0) {
- readBuffer.get(bytes, offset, remaining);
- leftToWrite -= remaining;
- newOffset += remaining;
- }
-
- // Fill the read buffer as best we can
- readBuffer.clear();
- int nRead = fillReadBuffer(block);
-
- // Full as much of the remaining byte array as possible with the data
- // that was just read
- if (nRead > 0) {
- readBuffer.flip();
- readBuffer.limit(nRead);
- if (nRead > leftToWrite) {
- readBuffer.get(bytes, newOffset, leftToWrite);
- leftToWrite = 0;
- } else {
- readBuffer.get(bytes, newOffset, nRead);
- leftToWrite -= nRead;
+ @Override
+ protected void doWrite(byte[] b, int off, int len) throws IOException {
+ int written = 0;
+ while (len - written > maxWrite) {
+ written += writeToSocket(b, off + written, maxWrite);
}
- } else if (nRead == 0) {
- readBuffer.flip();
- readBuffer.limit(nRead);
- } else if (nRead == -1) {
- throw new EOFException(sm.getString("nio.eof.error"));
+ writeToSocket(b, off + written, len - written);
}
- return len - leftToWrite;
- }
-
- private int fillReadBuffer(boolean block) throws IOException {
- int nRead;
- if (block) {
+ @Override
+ protected void doFlush() throws IOException {
+ NioEndpoint.KeyAttachment att =
+ (NioEndpoint.KeyAttachment) nioChannel.getAttachment(false);
+ if (att == null) {
+ throw new IOException("Key must be cancelled");
+ }
+ long writeTimeout = att.getTimeout();
Selector selector = null;
try {
selector = pool.get();
} catch ( IOException x ) {
- // Ignore
+ //ignore
}
try {
- NioEndpoint.KeyAttachment att =
- (NioEndpoint.KeyAttachment) nioChannel.getAttachment(false);
- if (att == null) {
- throw new IOException("Key must be cancelled.");
- }
- nRead = pool.read(nioChannel.getBufHandler().getReadBuffer(),
- nioChannel, selector, att.getTimeout());
- } catch (EOFException eof) {
- nRead = -1;
+ do {
+ if (nioChannel.flush(true, selector, writeTimeout)) {
+ break;
+ }
+ } while (true);
} finally {
if (selector != null) {
pool.put(selector);
}
}
- } else {
- nRead = nioChannel.read(nioChannel.getBufHandler().getReadBuffer());
}
- return nRead;
- }
- /*
- * Adapted from the NioOutputBuffer
- */
- private synchronized int writeToSocket(byte[] bytes, int off, int len)
- throws IOException {
-
- nioChannel.getBufHandler().getWriteBuffer().clear();
- nioChannel.getBufHandler().getWriteBuffer().put(bytes, off, len);
- nioChannel.getBufHandler().getWriteBuffer().flip();
-
- int written = 0;
- NioEndpoint.KeyAttachment att =
- (NioEndpoint.KeyAttachment) nioChannel.getAttachment(false);
- if (att == null) {
- throw new IOException("Key must be cancelled");
- }
- long writeTimeout = att.getTimeout();
- Selector selector = null;
- try {
- selector = pool.get();
- } catch ( IOException x ) {
- //ignore
- }
- try {
- written = pool.write(nioChannel.getBufHandler().getWriteBuffer(),
- nioChannel, selector, writeTimeout, true);
- } finally {
- if (selector != null) {
- pool.put(selector);
+ /*
+ * Adapted from the NioOutputBuffer
+ */
+ private synchronized int writeToSocket(byte[] bytes, int off, int len)
+ throws IOException {
+
+ nioChannel.getBufHandler().getWriteBuffer().clear();
+ nioChannel.getBufHandler().getWriteBuffer().put(bytes, off, len);
+ nioChannel.getBufHandler().getWriteBuffer().flip();
+
+ int written = 0;
+ NioEndpoint.KeyAttachment att =
+ (NioEndpoint.KeyAttachment) nioChannel.getAttachment(false);
+ if (att == null) {
+ throw new IOException("Key must be cancelled");
+ }
+ long writeTimeout = att.getTimeout();
+ Selector selector = null;
+ try {
+ selector = pool.get();
+ } catch ( IOException x ) {
+ //ignore
+ }
+ try {
+ written = pool.write(nioChannel.getBufHandler().getWriteBuffer(),
+ nioChannel, selector, writeTimeout, true);
+ } finally {
+ if (selector != null) {
+ pool.put(selector);
+ }
}
+ return written;
}
- return written;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org