You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by co...@apache.org on 2012/03/13 06:37:15 UTC
svn commit: r1299981 - in /tomcat/trunk/java/org/apache: coyote/spdy/
tomcat/spdy/
Author: costin
Date: Tue Mar 13 05:37:14 2012
New Revision: 1299981
URL: http://svn.apache.org/viewvc?rev=1299981&view=rev
Log:
Update the spdy implementation to use the non-blocking apr socket.
Fix various bugs found while stress testing.
Added:
tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextJni.java
Modified:
tomcat/trunk/java/org/apache/coyote/spdy/SpdyAprNpnHandler.java
tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
tomcat/trunk/java/org/apache/tomcat/spdy/SpdyConnection.java
tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContext.java
tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextProxy.java
tomcat/trunk/java/org/apache/tomcat/spdy/SpdyFrame.java
tomcat/trunk/java/org/apache/tomcat/spdy/SpdyStream.java
Modified: tomcat/trunk/java/org/apache/coyote/spdy/SpdyAprNpnHandler.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/spdy/SpdyAprNpnHandler.java?rev=1299981&r1=1299980&r2=1299981&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/spdy/SpdyAprNpnHandler.java (original)
+++ tomcat/trunk/java/org/apache/coyote/spdy/SpdyAprNpnHandler.java Tue Mar 13 05:37:14 2012
@@ -27,9 +27,11 @@ import org.apache.juli.logging.LogFactor
import org.apache.tomcat.jni.Error;
import org.apache.tomcat.jni.SSLExt;
import org.apache.tomcat.jni.Status;
+import org.apache.tomcat.jni.socket.AprSocketContext;
import org.apache.tomcat.spdy.CompressDeflater6;
import org.apache.tomcat.spdy.SpdyConnection;
import org.apache.tomcat.spdy.SpdyContext;
+import org.apache.tomcat.spdy.SpdyContextJni;
import org.apache.tomcat.spdy.SpdyStream;
import org.apache.tomcat.util.net.AbstractEndpoint;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
@@ -65,7 +67,7 @@ public class SpdyAprNpnHandler implement
private static final Log log = LogFactory.getLog(AprEndpoint.class);
- private SpdyContext spdyContext;
+ private SpdyContextApr spdyContext;
boolean ssl = true;
@@ -88,7 +90,7 @@ public class SpdyAprNpnHandler implement
}
- private final class SpdyContextApr extends SpdyContext {
+ private final class SpdyContextApr extends SpdyContextJni {
private final AbstractEndpoint ep;
private final Adapter adapter;
@@ -106,76 +108,6 @@ public class SpdyAprNpnHandler implement
}
}
- public static class SpdyConnectionApr extends SpdyConnection {
- long socket;
-
- public SpdyConnectionApr(SocketWrapper<Long> socketW,
- SpdyContext spdyContext, boolean ssl) {
- super(spdyContext);
- this.socket = socketW.getSocket().longValue();
- if (ssl) {
- setCompressSupport(new CompressDeflater6());
- }
- }
-
- // TODO: write/read should go to SocketWrapper.
- @Override
- public int write(byte[] data, int off, int len) {
- if (socket == 0 || inClosed) {
- return -1;
- }
- int rem = len;
- while (rem > 0) {
- int sent = org.apache.tomcat.jni.Socket.send(socket, data, off,
- rem);
- if (sent < 0) {
- inClosed = true;
- return -1;
- }
- if (sent == 0) {
- return len - rem;
- }
- rem -= sent;
- off += sent;
- }
- return len;
- }
-
- /**
- */
- @Override
- public int read(byte[] data, int off, int len) throws IOException {
- if (socket == 0 || inClosed) {
- return 0;
- }
- int rd = org.apache.tomcat.jni.Socket.recv(socket, data, off, len);
- if (rd == -Status.APR_EOF) {
- inClosed = true;
- return -1;
- }
- if (rd == -Status.TIMEUP) {
- rd = 0;
- }
- if (rd == -Status.EAGAIN) {
- rd = 0;
- }
- if (rd < 0) {
- // all other errors
- inClosed = true;
- throw new IOException("Error: " + rd + " "
- + Error.strerror((int) -rd));
- }
- off += rd;
- len -= rd;
- return rd;
- }
- }
-
- // apr normally creates a new object on each poll.
- // For 'upgraded' protocols we need to remember it's handled differently.
- Map<Long, SpdyConnectionApr> lightProcessors =
- new HashMap<Long, SpdyConnectionApr>();
-
@Override
public SocketState process(SocketWrapper<Long> socketO, SocketStatus status,
Http11AprProtocol proto, AbstractEndpoint endpoint) {
@@ -183,61 +115,12 @@ public class SpdyAprNpnHandler implement
SocketWrapper<Long> socketW = socketO;
long socket = ((Long) socketW.getSocket()).longValue();
- SpdyConnectionApr lh = lightProcessors.get(socket);
- // Are we getting an HTTP request ?
- if (lh == null && status != SocketStatus.OPEN) {
- return null;
- }
-
- log.info("Status: " + status);
-
- SocketState ss = null;
- if (lh != null) {
- // STOP, ERROR, DISCONNECT, TIMEOUT -> onClose
- if (status == SocketStatus.TIMEOUT) {
- // Called from maintain - we're removed from the poll
- ((AprEndpoint) endpoint).getCometPoller().add(
- socketO.getSocket().longValue(), false);
- return SocketState.LONG;
- }
- if (status == SocketStatus.STOP || status == SocketStatus.DISCONNECT ||
- status == SocketStatus.ERROR) {
- SpdyConnectionApr wrapper = lightProcessors.remove(socket);
- if (wrapper != null) {
- wrapper.onClose();
- }
- return SocketState.CLOSED;
- }
- int rc = lh.onBlockingSocket();
- ss = (rc == SpdyConnection.LONG) ? SocketState.LONG
- : SocketState.CLOSED;
- } else {
- // OPEN, no existing socket
- if (!ssl || SSLExt.checkNPN(socket, SpdyContext.SPDY_NPN)) {
- // NPN negotiated or not ssl
- lh = new SpdyConnectionApr(socketW, spdyContext, ssl);
-
- int rc = lh.onBlockingSocket();
- ss = (rc == SpdyConnection.LONG) ? SocketState.LONG
- : SocketState.CLOSED;
- if (ss == SocketState.LONG) {
- lightProcessors.put(socketO.getSocket().longValue(), lh);
- }
- } else {
- return null;
- }
- }
-
- // OPEN is used for both 'first time' and 'new connection'
- // In theory we shouldn't get another open while this is in
- // progress ( only after we add back to the poller )
-
- if (ss == SocketState.LONG) {
- log.info("Long poll: " + status);
- ((AprEndpoint) endpoint).getCometPoller().add(
- socketO.getSocket().longValue(), false);
+ try {
+ spdyContext.onAccept(socket);
+ } catch (IOException e) {
}
- return ss;
+ // No need to keep tomcat thread busy - but socket will be handled by apr socket context.
+ return SocketState.LONG;
}
public void onClose(SocketWrapper<Long> socketWrapper) {
Modified: tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java?rev=1299981&r1=1299980&r2=1299981&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java Tue Mar 13 05:37:14 2012
@@ -93,7 +93,7 @@ public class SpdyProcessor extends Abstr
public int doRead(ByteChunk bchunk, Request request) throws IOException {
if (inFrame == null) {
// blocking
- inFrame = spdyStream.getIn(endpoint.getSoTimeout());
+ inFrame = spdyStream.getDataFrame(endpoint.getSoTimeout());
}
if (inFrame == null) {
return -1;
@@ -388,13 +388,6 @@ public class SpdyProcessor extends Abstr
}
- private static byte[] STATUS = "status".getBytes();
-
- private static byte[] VERSION = "version".getBytes();
-
- private static byte[] HTTP11 = "HTTP/1.1".getBytes();
-
- private static byte[] OK200 = "200 OK".getBytes();
/**
* When committing the response, we have to validate the set of headers, as
@@ -424,8 +417,6 @@ public class SpdyProcessor extends Abstr
private void sendResponseHead() throws IOException {
SpdyFrame rframe = spdy.getFrame(SpdyConnection.TYPE_SYN_REPLY);
- // TODO: is closed ?
- rframe.streamId = spdyStream.reqFrame.streamId;
rframe.associated = 0;
MimeHeaders headers = response.getMimeHeaders();
@@ -444,10 +435,8 @@ public class SpdyProcessor extends Abstr
bc = mb.getByteChunk();
rframe.headerValue(bc.getBuffer(), bc.getStart(), bc.getLength());
}
- rframe.headerName(STATUS, 0, STATUS.length);
-
if (response.getStatus() == 0) {
- rframe.headerValue(OK200, 0, OK200.length);
+ rframe.addHeader(SpdyFrame.STATUS, SpdyFrame.OK200);
} else {
// HTTP header contents
String message = null;
@@ -466,12 +455,13 @@ public class SpdyProcessor extends Abstr
// TODO: optimize
String status = response.getStatus() + " " + message;
byte[] statusB = status.getBytes();
+ rframe.headerName(SpdyFrame.STATUS, 0, SpdyFrame.STATUS.length);
rframe.headerValue(statusB, 0, statusB.length);
}
- rframe.headerName(VERSION, 0, VERSION.length);
- rframe.headerValue(HTTP11, 0, HTTP11.length);
+ rframe.addHeader(SpdyFrame.VERSION, SpdyFrame.HTTP11);
- spdy.sendFrameBlocking(rframe, spdyStream);
+ rframe.streamId = spdyStream.reqFrame.streamId;
+ spdy.send(rframe, spdyStream);
// we can't reuse the frame - it'll be queued, the coyote processor
// may be reused as well.
outCommit = true;
Modified: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyConnection.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyConnection.java?rev=1299981&r1=1299980&r2=1299981&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyConnection.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyConnection.java Tue Mar 13 05:37:14 2012
@@ -17,8 +17,11 @@
package org.apache.tomcat.spdy;
import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -124,10 +127,40 @@ public abstract class SpdyConnection { /
private Condition outCondition;
+ public static final int LONG = 1;
+
+ public static final int CLOSE = -1;
+
+ private SpdyFrame nextFrame;
+
+ /**
+ * Handles the out queue for blocking sockets.
+ */
+ SpdyFrame out;
+
+ boolean draining = false;
+
+ private int goAway = Integer.MAX_VALUE;
+
public SpdyConnection(SpdyContext spdyContext) {
- this.setSpdyContext(spdyContext);
+ this.spdyContext = spdyContext;
outCondition = framerLock.newCondition();
}
+
+ public String toString() {
+ return "SpdyCon open=" + channels.size();
+ }
+
+ public void dump(PrintWriter out) {
+ out.println("SpdyConnection open=" + channels.size() +
+ " outQ:" + outQueue.size());
+ for (SpdyStream str: channels.values()) {
+ str.dump(out);
+ }
+
+ out.println();
+
+ }
/**
* Write.
@@ -140,6 +173,8 @@ public abstract class SpdyConnection { /
*/
public abstract int read(byte[] data, int off, int len) throws IOException;
+ public abstract void close() throws IOException;
+
public void setCompressSupport(CompressSupport cs) {
compressSupport = cs;
}
@@ -151,7 +186,7 @@ public abstract class SpdyConnection { /
return frame;
}
- public SpdyFrame getDataFrame() throws IOException {
+ public SpdyFrame getDataFrame() {
SpdyFrame frame = getSpdyContext().getFrame();
return frame;
}
@@ -167,17 +202,24 @@ public abstract class SpdyConnection { /
* - for fully non-blocking write: there will be a drain callback.
*/
- /**
- * Handles the out queue for blocking sockets.
- */
- SpdyFrame out;
-
- boolean draining = false;
+ public void drain() {
+ synchronized (nbDrain) {
+ if (draining) {
+ return;
+ }
+ draining = true;
+ }
+ _drain();
+ synchronized (nbDrain) {
+ draining = false;
+ }
+ }
+
/**
* Non blocking if the socket is not blocking.
*/
- private boolean drain() {
+ private boolean _drain() {
while (true) {
framerLock.lock();
@@ -190,9 +232,18 @@ public abstract class SpdyConnection { /
if (out == null) {
return false;
}
+ if (goAway < out.streamId) {
+
+ }
SpdyFrame oframe = out;
try {
- if (oframe.type == TYPE_SYN_STREAM) {
+ if (!oframe.c) {
+ // late: IDs are assigned as we send ( priorities may affect
+ // the transmission order )
+ if (oframe.stream != null) {
+ oframe.streamId = oframe.stream.getRequest().streamId;
+ }
+ } else if (oframe.type == TYPE_SYN_STREAM) {
oframe.fixNV(18);
if (compressSupport != null) {
compressSupport.compress(oframe, 18);
@@ -211,7 +262,9 @@ public abstract class SpdyConnection { /
if (oframe.type == TYPE_SYN_STREAM) {
oframe.streamId = outStreamId;
outStreamId += 2;
- channels.put(oframe.streamId, oframe.stream);
+ synchronized(channels) {
+ channels.put(oframe.streamId, oframe.stream);
+ }
}
oframe.serializeHead();
@@ -231,15 +284,20 @@ public abstract class SpdyConnection { /
try {
int toWrite = out.endData - out.off;
- int wr = write(out.data, out.off, toWrite);
- if (wr < 0) {
- return false;
- }
- if (wr < toWrite) {
- out.off += wr;
- return true; // non blocking connection
- }
- out.off += wr;
+ int wr;
+ while (toWrite > 0) {
+ wr = write(out.data, out.off, toWrite);
+ if (wr < 0) {
+ return false;
+ }
+ if (wr == 0) {
+ return true; // non blocking or to
+ }
+ if (wr <= toWrite) {
+ out.off += wr;
+ toWrite -= wr;
+ }
+ }
// Frame was sent
framerLock.lock();
try {
@@ -247,6 +305,13 @@ public abstract class SpdyConnection { /
} finally {
framerLock.unlock();
}
+
+ synchronized (channels) {
+ if (out.stream != null &&
+ out.stream.finRcvd && out.stream.finSent) {
+ channels.remove(out.streamId);
+ }
+ }
out = null;
} catch (IOException e) {
// connection closed - abort all streams
@@ -258,31 +323,6 @@ public abstract class SpdyConnection { /
}
/**
- * Blocking call for sendFrame: must be called from a thread pool.
- *
- * Will wait until the actual frame is sent.
- */
- public void sendFrameBlocking(SpdyFrame oframe, SpdyStream proc)
- throws IOException {
- queueFrame(oframe, proc, oframe.pri == 0 ? outQueue : prioriyQueue);
-
- nonBlockingDrain();
-
- while (!inClosed) {
- framerLock.lock();
- try {
- if (oframe.off == oframe.endData) {
- return; // was sent
- }
- outCondition.await();
- } catch (InterruptedException e) {
- } finally {
- framerLock.unlock();
- }
- }
- }
-
- /**
* Send as much as possible without blocking.
*
* With a nb transport it should call drain directly.
@@ -296,23 +336,11 @@ public abstract class SpdyConnection { /
Runnable nbDrain = new Runnable() {
public void run() {
- int i = drainCnt++;
- long t0 = System.currentTimeMillis();
- synchronized (nbDrain) {
- if (draining) {
- return;
- }
- draining = true;
- }
-
drain();
- synchronized (nbDrain) {
- draining = false;
- }
}
};
- public void sendFrameNonBlocking(SpdyFrame oframe, SpdyStream proc)
+ public void send(SpdyFrame oframe, SpdyStream proc)
throws IOException {
queueFrame(oframe, proc, oframe.pri == 0 ? outQueue : prioriyQueue);
nonBlockingDrain();
@@ -326,7 +354,7 @@ public abstract class SpdyConnection { /
// We can't assing a stream ID until it is sent - priorities
// we can't compress either - it's stateful.
oframe.stream = proc;
-
+
framerLock.lock();
try {
outQueue.add(oframe);
@@ -370,12 +398,6 @@ public abstract class SpdyConnection { /
}
}
- public static final int LONG = 1;
-
- public static final int CLOSE = -1;
-
- private SpdyFrame nextFrame;
-
/**
* Non-blocking method, read as much as possible and return.
*/
@@ -389,61 +411,66 @@ public abstract class SpdyConnection { /
inFrame.data = new byte[16 * 1024];
}
// we might already have data from previous frame
- if (inFrame.endData < 8 || // we don't have the header
- inFrame.endData < inFrame.endFrame) { // size != 0 - we
- // parsed the header
-
- int rd = read(inFrame.data, inFrame.endData,
- inFrame.data.length - inFrame.endData);
- if (rd < 0) {
- abort("Closed");
+ if (inFrame.endReadData < 8 || // we don't have the header
+ inFrame.endReadData < inFrame.endData) {
+
+ int rd = read(inFrame.data, inFrame.endReadData,
+ inFrame.data.length - inFrame.endReadData);
+ if (rd == -1) {
+ if (channels.size() == 0) {
+ return CLOSE;
+ } else {
+ abort("Closed");
+ }
+ } else if (rd < 0) {
+ abort("Closed - read error");
return CLOSE;
- }
- if (rd == 0) {
+ } else if (rd == 0) {
return LONG;
// Non-blocking channel - will resume reading at off
}
- inFrame.endData += rd;
+ inFrame.endReadData += rd;
}
- if (inFrame.endData < 8) {
+ if (inFrame.endReadData < 8) {
continue; // keep reading
}
- // We got the frame head
- if (inFrame.endFrame == 0) {
+ if (inFrame.endData == 0) {
inFrame.parse();
if (inFrame.version != 2) {
abort("Wrong version");
return CLOSE;
}
- // MAx_FRAME_SIZE
- if (inFrame.endFrame < 0 || inFrame.endFrame > 32000) {
- abort("Framing error, size = " + inFrame.endFrame);
+ // MAX_FRAME_SIZE
+ if (inFrame.endData < 0 || inFrame.endData > 32000) {
+ abort("Framing error, size = " + inFrame.endData);
return CLOSE;
}
- // grow the buffer if needed. no need to copy the head, parsed
- // ( maybe for debugging ).
- if (inFrame.data.length < inFrame.endFrame) {
- inFrame.data = new byte[inFrame.endFrame];
+ // TODO: if data, split it in 2 frames
+ // grow the buffer if needed.
+ if (inFrame.data.length < inFrame.endData) {
+ byte[] tmp = new byte[inFrame.endData];
+ System.arraycopy(inFrame.data, 0, tmp, 0, inFrame.endReadData);
+ inFrame.data = tmp;
}
}
- if (inFrame.endData < inFrame.endFrame) {
+ if (inFrame.endReadData < inFrame.endData) {
continue; // keep reading to fill current frame
}
// else: we have at least the current frame
- int extra = inFrame.endData - inFrame.endFrame;
+ int extra = inFrame.endReadData - inFrame.endData;
if (extra > 0) {
// and a bit more - to keep things simple for now we
// copy them to next frame, at least we saved reads.
// it is possible to avoid copy - but later.
nextFrame = getSpdyContext().getFrame();
nextFrame.makeSpace(extra);
- System.arraycopy(inFrame.data, inFrame.endFrame,
+ System.arraycopy(inFrame.data, inFrame.endData,
nextFrame.data, 0, extra);
- nextFrame.endData = extra;
- inFrame.endData = inFrame.endFrame;
+ nextFrame.endReadData = extra;
+ inFrame.endReadData = inFrame.endData;
}
// decompress
@@ -503,17 +530,38 @@ public abstract class SpdyConnection { /
public void abort(String msg) {
System.err.println(msg);
inClosed = true;
- // TODO: close all streams
+ List<Integer> ch = new ArrayList<Integer>(channels.keySet());
+ for (Integer i: ch) {
+ SpdyStream stream = channels.remove(i);
+ if (stream != null) {
+ stream.onReset();
+ }
+ }
+ }
+
+ public void abort(String msg, int last) {
+ System.err.println(msg);
+ inClosed = true;
+
+ List<Integer> ch = new ArrayList<Integer>(channels.keySet());
+ for (Integer i: ch) {
+ if (i > last) {
+ SpdyStream stream = channels.remove(i);
+ if (stream != null) {
+ stream.onReset();
+ }
+ }
+ }
}
/**
* Process a SPDY connection. Called in a separate thread.
- *
+ *
* @return
* @throws IOException
*/
- public int handleFrame() throws IOException {
+ protected int handleFrame() throws IOException {
if (inFrame.c) {
switch (inFrame.type) {
case TYPE_SETTINGS: {
@@ -529,7 +577,11 @@ public abstract class SpdyConnection { /
case TYPE_GOAWAY: {
int lastStream = inFrame.readInt();
log.info("GOAWAY last=" + lastStream);
- abort("GOAWAY");
+
+ // Server will shut down - but will keep processing the current requests,
+ // up to lastStream. If we sent any new ones - they need to be canceled.
+ abort("GO_AWAY", lastStream);
+ goAway = lastStream;
return CLOSE;
}
case TYPE_RST_STREAM: {
@@ -540,12 +592,19 @@ public abstract class SpdyConnection { /
+ " "
+ ((errCode < RST_ERRORS.length) ? RST_ERRORS[errCode]
: errCode));
- SpdyStream sch = channels.get(inFrame.streamId);
+ SpdyStream sch;
+ synchronized(channels) {
+ sch = channels.get(inFrame.streamId);
+ }
if (sch == null) {
abort("Missing channel " + inFrame.streamId);
return CLOSE;
}
sch.onCtlFrame(inFrame);
+
+ synchronized(channels) {
+ channels.remove(inFrame.streamId);
+ }
inFrame = null;
break;
}
@@ -569,7 +628,10 @@ public abstract class SpdyConnection { /
break;
}
case TYPE_SYN_REPLY: {
- SpdyStream sch = channels.get(inFrame.streamId);
+ SpdyStream sch;
+ synchronized(channels) {
+ sch = channels.get(inFrame.streamId);
+ }
if (sch == null) {
abort("Missing channel");
return CLOSE;
@@ -593,18 +655,26 @@ public abstract class SpdyConnection { /
oframe.append32(inFrame.read32());
oframe.pri = 0x80;
- sendFrameNonBlocking(oframe, null);
+ send(oframe, null);
break;
}
}
} else {
// Data frame
- SpdyStream sch = channels.get(inFrame.streamId);
+ SpdyStream sch;
+ synchronized (channels) {
+ sch = channels.get(inFrame.streamId);
+ }
if (sch == null) {
abort("Missing channel");
return CLOSE;
}
sch.onDataFrame(inFrame);
+ synchronized (channels) {
+ if (sch.finRcvd && sch.finSent) {
+ channels.remove(inFrame.streamId);
+ }
+ }
inFrame = null;
}
return LONG;
@@ -614,14 +684,10 @@ public abstract class SpdyConnection { /
return spdyContext;
}
- public void setSpdyContext(SpdyContext spdyContext) {
- this.spdyContext = spdyContext;
- }
-
public SpdyStream get(String host, String url) throws IOException {
SpdyStream sch = new SpdyStream(this);
- sch.addHeader("host", host);
- sch.addHeader("url", url);
+ sch.getRequest().addHeader("host", host);
+ sch.getRequest().addHeader("url", url);
sch.send();
Modified: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContext.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContext.java?rev=1299981&r1=1299980&r2=1299981&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContext.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContext.java Tue Mar 13 05:37:14 2012
@@ -47,9 +47,12 @@ public class SpdyContext {
private Executor executor;
- int defaultFrameSize = 8196;
+ int defaultFrameSize = 8192;
- public static boolean debug = true;
+ public static boolean debug = false;
+
+ boolean tls = true;
+ boolean compression = true;
/**
* Get a frame - frames are heavy buffers, may be reused.
@@ -62,6 +65,19 @@ public class SpdyContext {
}
/**
+ * Set the max frame size.
+ *
+ * Larger data packets will be split in multiple frames.
+ *
+ * ( the code is currently accepting larger control frames - it's not
+ * clear if we should just reject them, many servers limit header size -
+ * the http connector also has a 8k limit - getMaxHttpHeaderSize )
+ */
+ public void setFrameSize(int frameSize) {
+ defaultFrameSize = frameSize;
+ }
+
+ /**
* Override for server side to return a custom stream.
*/
public SpdyStream getStream(SpdyConnection framer) {
@@ -104,4 +120,14 @@ public class SpdyContext {
public void releaseConnection(SpdyConnection con) {
}
+
+ public void listen(final int port, String cert, String key) throws IOException {
+ throw new IOException("Not implemented");
+ }
+
+ /**
+ * Close all pending connections and free resources.
+ */
+ public void stop() throws IOException {
+ }
}
Added: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextJni.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextJni.java?rev=1299981&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextJni.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextJni.java Tue Mar 13 05:37:14 2012
@@ -0,0 +1,182 @@
+/*
+ */
+package org.apache.tomcat.spdy;
+
+import java.io.IOException;
+
+import org.apache.tomcat.jni.Status;
+import org.apache.tomcat.jni.socket.AprSocket;
+import org.apache.tomcat.jni.socket.AprSocketContext;
+import org.apache.tomcat.jni.socket.AprSocketContext.NonBlockingPollHandler;
+import org.apache.tomcat.jni.socket.AprSocketContext.TlsCertVerifier;
+
+public class SpdyContextJni extends SpdyContext {
+ AprSocketContext con;
+
+ //AprSocketContext socketCtx;
+
+ public SpdyContextJni() {
+ con = new AprSocketContext();
+ //if (insecureCerts) {
+ con.customVerification(new TlsCertVerifier() {
+ @Override
+ public void handshakeDone(AprSocket ch) {
+ }
+ });
+ //}
+ con.setNpn("spdy/2");
+ }
+
+ @Override
+ public SpdyConnection getConnection(String host, int port) throws IOException {
+ SpdyConnectionAprSocket spdy = new SpdyConnectionAprSocket(this);
+
+ AprSocket ch = con.socket(host, port, tls);
+
+ spdy.setSocket(ch);
+
+ ch.connect();
+
+ ch.setHandler(new SpdySocketHandler(spdy));
+
+ // need to consume the input to receive more read events
+ int rc = spdy.processInput();
+ if (rc == SpdyConnection.CLOSE) {
+ ch.close();
+ throw new IOException("Error connecting");
+ }
+
+ return spdy;
+ }
+
+ public void onAccept(long socket) throws IOException {
+ SpdyConnectionAprSocket spdy = new SpdyConnectionAprSocket(SpdyContextJni.this);
+ AprSocket s = con.socket(socket);
+ spdy.setSocket(s);
+
+ SpdySocketHandler handler = new SpdySocketHandler(spdy);
+ s.setHandler(handler);
+ handler.process(s, true, true, false);
+ }
+
+ public void listen(final int port, String cert, String key) throws IOException {
+ con = new AprSocketContext() {
+ protected void onSocket(AprSocket s) throws IOException {
+ SpdyConnectionAprSocket spdy = new SpdyConnectionAprSocket(SpdyContextJni.this);
+ spdy.setSocket(s);
+
+ SpdySocketHandler handler = new SpdySocketHandler(spdy);
+ s.setHandler(handler);
+ }
+ };
+
+ con.setNpn(SpdyContext.SPDY_NPN_OUT);
+ con.setKeys(cert, key);
+
+ con.listen(port);
+ }
+
+ public void stop() throws IOException {
+ con.stop();
+ }
+
+ public AprSocketContext getAprContext() {
+ return con;
+ }
+
+ // NB
+ class SpdySocketHandler implements NonBlockingPollHandler {
+ SpdyConnection con;
+
+ SpdySocketHandler(SpdyConnection con) {
+ this.con = con;
+ }
+
+ @Override
+ public void closed(AprSocket ch) {
+ // not used ( polling not implemented yet )
+ }
+
+ @Override
+ public void process(AprSocket ch, boolean in, boolean out, boolean close) {
+ try {
+ int rc = con.processInput();
+ if (rc == SpdyConnection.CLOSE) {
+ ch.close();
+ }
+ con.drain();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ ch.reset();
+ }
+ }
+
+ @Override
+ public void connected(AprSocket ch) {
+ }
+
+ @Override
+ public void error(AprSocket ch, Throwable t) {
+ }
+
+ }
+
+ public static class SpdyConnectionAprSocket extends SpdyConnection {
+ AprSocket socket;
+
+ public SpdyConnectionAprSocket(SpdyContext spdyContext) {
+ super(spdyContext);
+ //setCompressSupport(new CompressJzlib());
+ if (spdyContext.compression) {
+ setCompressSupport(new CompressDeflater6());
+ }
+ }
+
+ public void setSocket(AprSocket ch) {
+ this.socket = ch;
+ }
+
+ @Override
+ public void close() throws IOException {
+ socket.close();
+ }
+
+ @Override
+ public int write(byte[] data, int off, int len) throws IOException {
+ if (socket == null) {
+ return -1;
+ }
+ int sent = socket.write(data, off, len);
+ if (sent < 0) {
+ return -1;
+ }
+ return sent;
+ }
+
+ /**
+ * @throws IOException
+ */
+ @Override
+ public int read(byte[] data, int off, int len) throws IOException {
+ if (socket == null) {
+ return -1;
+ }
+ int rd = socket.read(data, off, len);
+ // org.apache.tomcat.jni.Socket.recv(socket, data, off, len);
+ if (rd == -Status.APR_EOF) {
+ return -1;
+ }
+ if (rd == -Status.TIMEUP || rd == -Status.EINTR || rd == -Status.EAGAIN) {
+ rd = 0;
+ }
+ if (rd < 0) {
+ return -1;
+ }
+ off += rd;
+ len -= rd;
+ return rd;
+ }
+ }
+
+}
Modified: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextProxy.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextProxy.java?rev=1299981&r1=1299980&r2=1299981&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextProxy.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextProxy.java Tue Mar 13 05:37:14 2012
@@ -17,8 +17,10 @@
package org.apache.tomcat.spdy;
import java.io.IOException;
+import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
+import java.util.concurrent.Semaphore;
/**
* Spdy context for 'proxy' or test mode spdy - no NPN, no SSL, no compression.
@@ -78,6 +80,11 @@ public class SpdyContextProxy extends Sp
}
@Override
+ public void close() throws IOException {
+ socket.close();
+ }
+
+ @Override
public synchronized int write(byte[] data, int off, int len) throws IOException {
socket.getOutputStream().write(data, off, len);
return len;
@@ -92,4 +99,52 @@ public class SpdyContextProxy extends Sp
}
}
}
+
+
+ boolean running = true;
+ ServerSocket serverSocket;
+
+ public void stop() throws IOException {
+ running = false;
+ serverSocket.close();
+ }
+
+ /**
+ * For small servers/testing: run in server mode.
+ * Need to override onSynStream() to implement the logic.
+ */
+ public void listen(final int port, String cert, String key) throws IOException {
+ getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ accept(port);
+ }
+ });
+ }
+
+ private void accept(int port) {
+ try {
+ serverSocket = new ServerSocket(port);
+ while (running) {
+ final Socket socket = serverSocket.accept();
+ final SpdyConnection con = getConnection(socket);
+ getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ con.onBlockingSocket();
+ try {
+ socket.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+ } catch (IOException ex) {
+ if (running) {
+ ex.printStackTrace();
+ }
+ running = false;
+ }
+ }
}
Modified: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyFrame.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyFrame.java?rev=1299981&r1=1299980&r2=1299981&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyFrame.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyFrame.java Tue Mar 13 05:37:14 2012
@@ -16,7 +16,18 @@
*/
package org.apache.tomcat.spdy;
+import java.util.Map;
+
public class SpdyFrame {
+ public static byte[] STATUS = "status".getBytes();
+
+ public static byte[] VERSION = "version".getBytes();
+
+ public static byte[] HTTP11 = "HTTP/1.1".getBytes();
+
+ public static byte[] OK200 = "200 OK".getBytes();
+
+
// This is a bit more complicated, to avoid multiple reads/writes.
// We'll read as much as possible - possible past frame end. This may
// cost an extra copy - or even more complexity for dealing with slices
@@ -25,18 +36,21 @@ public class SpdyFrame {
public int off = 8; // used when reading - current offset
- public int endFrame; // end of frame == size + 8
+ int endReadData; // how much has been read ( may be more or less than a frame )
// On write it is incremented.
- public int endData; // end of data in the buffer (may be past frame end)
-
+ /**
+ * end of data in the buffer.
+ */
+ public int endData;
+
// Processed data from the frame
boolean c; // for control
int version;
- private int flags;
+ int flags;
public int type;
@@ -62,7 +76,7 @@ public class SpdyFrame {
public void recyle() {
type = 0;
c = false;
- endFrame = 0;
+ endReadData = 0;
off = 8;
streamId = 0;
nvCount = 0;
@@ -76,10 +90,10 @@ public class SpdyFrame {
}
return "C" + " S=" + streamId + (flags != 0 ? " F=" + flags : "")
+ (version != 2 ? " v" + version : "") + " t=" + type
- + " L=" + endFrame + "/" + off;
+ + " L=" + endData + "/" + off;
} else {
return "D" + " S=" + streamId + (flags != 0 ? " F=" + flags : "")
- + " L=" + endFrame + "/" + off;
+ + " L=" + endData + "/" + off;
}
}
@@ -118,7 +132,7 @@ public class SpdyFrame {
}
public boolean parse() {
- endFrame = 0;
+ endData = 0;
streamId = 0;
nvCount = 0;
@@ -142,12 +156,12 @@ public class SpdyFrame {
flags = data[4] & 0xFF;
for (int i = 5; i < 8; i++) {
b0 = data[i] & 0xFF;
- endFrame = endFrame << 8 | b0;
+ endData = endData << 8 | b0;
}
// size will represent the end of the data ( header is held in same
// buffer)
- endFrame += 8;
+ endData += 8;
return true;
}
@@ -226,6 +240,37 @@ public class SpdyFrame {
nvCount++;
headerValue(buf, soff, len);
}
+
+ public void addHeader(String name, String value) {
+ byte[] nameB = name.getBytes();
+ headerName(nameB, 0, nameB.length);
+ nameB = value.getBytes();
+ headerValue(nameB, 0, nameB.length);
+ }
+
+ public void addHeader(byte[] nameB, String value) {
+ headerName(nameB, 0, nameB.length);
+ nameB = value.getBytes();
+ headerValue(nameB, 0, nameB.length);
+ }
+
+ public void addHeader(byte[] nameB, byte[] valueB) {
+ headerName(nameB, 0, nameB.length);
+ headerValue(valueB, 0, valueB.length);
+ }
+
+ public void getHeaders(Map<String, String> resHeaders) {
+ for (int i = 0; i < nvCount; i++) {
+ int len = read16();
+ String n = new String(data, off, len, SpdyStream.UTF8);
+ advance(len);
+ len = read16();
+ String v = new String(data, off, len, SpdyStream.UTF8);
+ advance(len);
+ resHeaders.put(n, v);
+ }
+ }
+
// TODO: instead of that, use byte[][]
void makeSpace(int len) {
@@ -294,11 +339,14 @@ public class SpdyFrame {
}
public int remaining() {
- return endFrame - off;
+ return endData - off;
}
public void advance(int cnt) {
off += cnt;
}
-}
\ No newline at end of file
+ public boolean isData() {
+ return !c;
+ }
+}
Modified: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyStream.java?rev=1299981&r1=1299980&r2=1299981&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyStream.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyStream.java Tue Mar 13 05:37:14 2012
@@ -17,6 +17,7 @@
package org.apache.tomcat.spdy;
import java.io.IOException;
+import java.io.PrintWriter;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -43,20 +44,44 @@ public class SpdyStream {
public SpdyFrame reqFrame;
- SpdyFrame resFrame;
+ public SpdyFrame resFrame;
- BlockingQueue<SpdyFrame> inData = new LinkedBlockingQueue<SpdyFrame>();
-
- public static final SpdyFrame END_FRAME = new SpdyFrame(16);
+ /**
+ * For blocking support.
+ */
+ protected BlockingQueue<SpdyFrame> inData = new LinkedBlockingQueue<SpdyFrame>();
- boolean finSent;
+ protected boolean finSent;
protected boolean finRcvd;
+
+ /**
+ * Dummy data frame to insert on reset / go away
+ */
+ static SpdyFrame END_FRAME;
+
+ static {
+ END_FRAME = new SpdyFrame(16);
+ END_FRAME.endData = 0;
+ END_FRAME.off = 0;
+ END_FRAME.c = false;
+ END_FRAME.flags =SpdyConnection.FLAG_HALF_CLOSE;
+ }
public SpdyStream(SpdyConnection spdy) {
this.spdy = spdy;
}
+ public void dump(PrintWriter out) {
+ if (reqFrame != null) {
+ out.println("Req: " + reqFrame);
+ }
+ if (resFrame != null) {
+ out.println("Res: " + resFrame);
+ }
+ out.println("In: " + inData.size() + (finRcvd ? " FIN":""));
+ }
+
/**
* Non-blocking, called when a data frame is received.
*
@@ -64,10 +89,11 @@ public class SpdyStream {
* buffer ( to avoid a copy ).
*/
public void onDataFrame(SpdyFrame inFrame) {
- inData.add(inFrame);
- if (inFrame.closed()) {
- finRcvd = true;
- inData.add(END_FRAME);
+ synchronized(this) {
+ inData.add(inFrame);
+ if (inFrame.closed()) {
+ finRcvd = true;
+ }
}
}
@@ -84,29 +110,61 @@ public class SpdyStream {
reqFrame = frame;
} else if (frame.type == SpdyConnection.TYPE_SYN_REPLY) {
resFrame = frame;
+ } else if (frame.type == SpdyConnection.TYPE_RST_STREAM) {
+ onReset();
}
- if (frame.isHalfClose()) {
- finRcvd = true;
+ synchronized (this) {
+ inData.add(frame);
+ if (frame.isHalfClose()) {
+ finRcvd = true;
+ }
}
}
+ /**
+ * Called on GOAWAY or reset.
+ */
+ public void onReset() {
+ finRcvd = true;
+ finSent = true;
+
+ // To unblock
+ inData.add(END_FRAME);
+ }
+
/**
* True if the channel both received and sent FIN frames.
- *
+ *
* This is tracked by the processor, to avoid extra storage in framer.
*/
public boolean isFinished() {
return finSent && finRcvd;
}
-
- public SpdyFrame getIn(long to) throws IOException {
+
+ /**
+ * Waits and return the next data frame.
+ */
+ public SpdyFrame getDataFrame(long to) throws IOException {
+ while (true) {
+ SpdyFrame res = getFrame(to);
+ if (res == null || res.isData()) {
+ return res;
+ }
+ }
+ }
+
+ /**
+ * Waits and return the next frame. First frame will be the control frame
+ */
+ public SpdyFrame getFrame(long to) throws IOException {
SpdyFrame in;
try {
- if (inData.size() == 0 && finRcvd) {
- return null;
+ synchronized (this) {
+ if (inData.size() == 0 && finRcvd) {
+ return null;
+ }
}
in = inData.poll(to, TimeUnit.MILLISECONDS);
-
if (in == END_FRAME) {
return null;
}
@@ -137,14 +195,14 @@ public class SpdyStream {
return reqFrame;
}
- public void addHeader(String name, String value) {
- byte[] nameB = name.getBytes();
- getRequest().headerName(nameB, 0, nameB.length);
- nameB = value.getBytes();
- reqFrame.headerValue(nameB, 0, nameB.length);
+ public SpdyFrame getResponse() {
+ if (resFrame == null) {
+ resFrame = spdy.getFrame(SpdyConnection.TYPE_SYN_REPLY);
+ resFrame.streamId = reqFrame.streamId;
+ }
+ return resFrame;
}
-
public synchronized void sendDataFrame(byte[] data, int start,
int length, boolean close) throws IOException {
@@ -159,12 +217,11 @@ public class SpdyStream {
// 1 tcp packet. That's the current choice, seems closer to rest of
// tomcat
- oframe.streamId = reqFrame.streamId;
if (close)
oframe.halfClose();
oframe.append(data, start, length);
- spdy.sendFrameBlocking(oframe, this);
+ spdy.send(oframe, this);
}
public void send() throws IOException {
@@ -172,8 +229,8 @@ public class SpdyStream {
}
public void send(String host, String url, String scheme, String method) throws IOException {
- addHeader("host", host);
- addHeader("url", url);
+ getRequest().addHeader("host", host);
+ getRequest().addHeader("url", url);
send(scheme, method);
}
@@ -184,13 +241,14 @@ public class SpdyStream {
// TODO: add the others
reqFrame.halfClose();
}
- addHeader("scheme", "http"); // todo
- addHeader("method", method);
- addHeader("version", "HTTP/1.1");
+ getRequest().addHeader("scheme", "http"); // todo
+ getRequest().addHeader("method", method);
+ getRequest().addHeader("version", "HTTP/1.1");
if (reqFrame.isHalfClose()) {
finSent = true;
}
- spdy.sendFrameBlocking(reqFrame, this);
+ spdy.send(reqFrame, this);
}
-}
\ No newline at end of file
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org