You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by co...@apache.org on 2010/01/13 03:07:27 UTC
svn commit: r898619 - in /tomcat/trunk/modules/tomcat-lite:
java/org/apache/tomcat/lite/http/ java/org/apache/tomcat/lite/io/
java/org/apache/tomcat/lite/service/ test/org/apache/tomcat/lite/
test/org/apache/tomcat/lite/http/ test/org/apache/tomcat/lit...
Author: costin
Date: Wed Jan 13 02:07:25 2010
New Revision: 898619
URL: http://svn.apache.org/viewvc?rev=898619&view=rev
Log:
Moved the connection pool to a top level class and started to add more code. Still missing is evicting kept-alive connections and
queueing to limit the number of active requests per host ( and probably more ).
Started to make spdy more like a part of a http request - i.e. upgrade if supported by both ends, etc. Now load
tests seem to work - no more OOM. Due to compression spdy it's using more memory per connection, current tests don't enable compression
( it's accepted for incoming connections ).
Modified:
tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/CompressFilter.java
tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/Http11Connection.java
tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpChannel.java
tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpConnector.java
tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/SpdyConnection.java
tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOBuffer.java
tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOChannel.java
tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/service/IOStatus.java
tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/TestMain.java
tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelInMemoryTest.java
tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/LiveHttp1Test.java
tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/SpdyTest.java
tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/load/LiveHttpThreadedTest.java
tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/test/watchdog/WatchdogClient.java
Modified: tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/CompressFilter.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/CompressFilter.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/CompressFilter.java (original)
+++ tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/CompressFilter.java Wed Jan 13 02:07:25 2010
@@ -38,22 +38,34 @@
long dictId;
public CompressFilter() {
- cStream = new ZStream();
- cStream.deflateInit(JZlib.Z_BEST_COMPRESSION);
-
- dStream = new ZStream();
- dStream.inflateInit();
}
public void recycle() {
+ if (cStream == null) {
+ return;
+ }
+ cStream.free();
+ cStream = null;
+ dStream.free();
+ dStream = null;
+ }
+
+ public void init() {
+ if (cStream != null) {
+ return;
+ }
// can't call: cStream.free(); - will kill the adler, NPE
-
- cStream.deflateInit(JZlib.Z_BEST_COMPRESSION);
+ cStream = new ZStream();
+ // BEST_COMRESSION results in 256Kb per Deflate
+ cStream.deflateInit(JZlib.Z_BEST_SPEED);
+
+ dStream = new ZStream();
dStream.inflateInit();
}
CompressFilter setDictionary(byte[] dict, long id) {
+ init();
this.dict = dict;
this.dictId = id;
cStream.deflateSetDictionary(dict, dict.length);
@@ -61,6 +73,7 @@
}
void compress(IOBuffer in, IOBuffer out) throws IOException {
+ init();
BBucket bb = in.popFirst();
while (bb != null) {
@@ -80,6 +93,7 @@
// TODO: only the last one needs flush
// TODO: size missmatches ?
+ init();
int flush = JZlib.Z_PARTIAL_FLUSH;
cStream.next_in = bb.array();
@@ -129,6 +143,7 @@
}
void decompress(IOBuffer in, IOBuffer out, int len) throws IOException {
+ init();
BBucket bb = in.peekFirst();
while (bb != null && len > 0) {
Modified: tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/Http11Connection.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/Http11Connection.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/Http11Connection.java (original)
+++ tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/Http11Connection.java Wed Jan 13 02:07:25 2010
@@ -12,12 +12,16 @@
import org.apache.tomcat.lite.io.BBucket;
import org.apache.tomcat.lite.io.BBuffer;
import org.apache.tomcat.lite.io.CBuffer;
+import org.apache.tomcat.lite.io.IOConnector;
+import org.apache.tomcat.lite.io.DumpChannel;
import org.apache.tomcat.lite.io.FastHttpDateFormat;
import org.apache.tomcat.lite.io.Hex;
import org.apache.tomcat.lite.io.IOBuffer;
import org.apache.tomcat.lite.io.IOChannel;
+import org.apache.tomcat.lite.io.SslChannel;
-public class Http11Connection extends HttpConnection {
+public class Http11Connection extends HttpConnection
+ implements IOConnector.ConnectedCallback {
public static final String CHUNKED = "chunked";
public static final String CLOSE = "close";
@@ -33,8 +37,7 @@
static final byte COLON = (byte) ':';
// super.net is the socket
-
- HttpChannel activeHttp;
+
boolean debug;
BBuffer line = BBuffer.wrapper();
boolean endSent = false;
@@ -74,7 +77,11 @@
}
public void beforeRequest() {
- activeHttp = null;
+ nextRequest();
+ headRecvBuf.recycle();
+ }
+
+ public void nextRequest() {
endSent = false;
keepAlive = true;
receiveBodyState.recycle();
@@ -84,7 +91,6 @@
http10 = false;
headersReceived = false;
bodyReceived = false;
- headRecvBuf.recycle();
}
public Http11Connection serverMode() {
@@ -108,7 +114,8 @@
if (headRecvBuf.get(0) == 0x80 &&
headRecvBuf.get(1) == 0x01) {
// SPDY signature ( experimental )
- switchedProtocol = new SpdyConnection(httpConnector);
+ switchedProtocol = new SpdyConnection(httpConnector,
+ remoteHost);
if (serverMode) {
switchedProtocol.serverMode = true;
}
@@ -167,9 +174,9 @@
}
@Override
- public void handleReceived(IOChannel netx) throws IOException {
+ public void dataReceived(IOBuffer netx) throws IOException {
if (switchedProtocol != null) {
- switchedProtocol.handleReceived(netx);
+ switchedProtocol.dataReceived(netx);
return;
}
//trace("handleReceived " + headersReceived);
@@ -299,7 +306,7 @@
if (http09) {
keepAlive = false;
}
- if (!net.isOpen()) {
+ if (net != null && !net.isOpen()) {
keepAlive = false;
}
return keepAlive;
@@ -311,8 +318,8 @@
switchedProtocol.endSendReceive(http);
return;
}
- activeHttp = null;
- if (!keepAlive()) {
+ boolean keepAlive = keepAlive();
+ if (!keepAlive) {
if (debug) {
log.info("--- Close socket, no keepalive " + net);
}
@@ -321,27 +328,16 @@
net.startSending();
}
- beforeRequest();
- return;
}
synchronized (readLock) {
- beforeRequest(); // will clear head buffer
requestCount++;
- if (serverMode) {
- if (debug) {
- log.info(">>> server socket KEEP_ALIVE " + net.getTarget() +
- " " + net + " " + net.getIn().available());
- }
+ beforeRequest();
+ httpConnector.cpool.afterRequest(http, this, true);
+
+ if (serverMode && keepAlive) {
handleReceived(net); // will attempt to read next req
- } else {
- if (debug) {
- log.info(">>> client socket KEEP_ALIVE " + net.getTarget() +
- " " + net);
- }
- httpConnector.cpool.returnChannel(this);
}
}
-
}
private void trace(String s) {
@@ -741,8 +737,6 @@
return;
}
- this.activeHttp = http;
-
// Update transfer fields based on headers.
processProtocol(http.getRequest().protocol());
updateKeepAlive(http.getRequest().getMimeHeaders(), true);
@@ -1402,5 +1396,33 @@
(bodyReceived ? " BODY " : "")
;
}
-
+
+ @Override
+ public void handleConnected(IOChannel net) throws IOException {
+
+ HttpChannel httpCh = activeHttp;
+ boolean ssl = httpCh.getRequest().isSecure();
+ if (ssl) {
+ SslChannel ch1 = new SslChannel();
+ ch1.setSslContext(httpConnector.sslConnector.getSSLContext());
+ ch1.setSink(net);
+ net.addFilterAfter(ch1);
+ net = ch1;
+ }
+ if (httpConnector.debugHttp) {
+ IOChannel ch1 = new DumpChannel("");
+ net.addFilterAfter(ch1);
+ net = ch1;
+ }
+
+ if (!net.isOpen()) {
+ httpCh.abort("Can't connect");
+ return;
+ }
+
+ setSink(net);
+
+ sendRequest(httpCh);
+ }
+
}
Modified: tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpChannel.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpChannel.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpChannel.java (original)
+++ tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpChannel.java Wed Jan 13 02:07:25 2010
@@ -316,7 +316,7 @@
if (target == null) {
return ":0"; // server mode ?
}
- return target;
+ return target.toString();
}
@@ -797,11 +797,25 @@
}
-
+ /**
+ * This method will be called when the http headers have been received -
+ * the body may or may not be available.
+ *
+ * In server mode this is equivalent with a servlet request.
+ * This is also called for http client, when the response headers
+ * are received.
+ *
+ * TODO: rename it to HttMessageReceived or something similar.
+ */
public static interface HttpService {
void service(HttpRequest httpReq, HttpResponse httpRes) throws IOException;
}
+ /**
+ * Called when both request and response bodies have been sent/
+ * received. After this call the HttpChannel will be disconnected
+ * from the http connection, which can be used for other requests.
+ */
public static interface RequestCompleted {
void handle(HttpChannel data, Object extraData) throws IOException;
}
@@ -814,11 +828,4 @@
};
- IOConnector.ConnectedCallback connectedCallback = new IOConnector.ConnectedCallback() {
- @Override
- public void handleConnected(IOChannel ch) throws IOException {
- httpConnector.handleConnected(ch, HttpChannel.this);
- }
- };
-
}
\ No newline at end of file
Modified: tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpConnector.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpConnector.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpConnector.java (original)
+++ tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpConnector.java Wed Jan 13 02:07:25 2010
@@ -44,8 +44,6 @@
public void onDestroy(HttpChannel ch, HttpConnector con) throws IOException;
}
- HttpConnectionManager conManager = new HttpConnectionManager();
-
private static Logger log = Logger.getLogger("HttpConnector");
/**
@@ -84,7 +82,7 @@
public AtomicInteger recycledChannels = new AtomicInteger();
public AtomicInteger reusedChannels = new AtomicInteger();
- public ConnectionPool cpool = new ConnectionPool();
+ public HttpConnectionPool cpool = new HttpConnectionPool(this);
// Host + context mapper.
Dispatcher dispatcher;
@@ -332,11 +330,6 @@
}
- HttpConnection newConnection() {
- return conManager.newConnection(this);
- }
-
-
private class AcceptorCallback implements IOConnector.ConnectedCallback {
@Override
public void handleConnected(IOChannel accepted) throws IOException {
@@ -346,7 +339,7 @@
public HttpConnection handleAccepted(IOChannel accepted) throws IOException {
// TODO: reuse
- HttpConnection shttp = newConnection();
+ HttpConnection shttp = cpool.accepted(accepted);
shttp.serverMode = true;
if (debugHttp) {
@@ -371,50 +364,11 @@
return this;
}
- public void handleConnected(IOChannel net, HttpChannel httpCh)
- throws IOException {
- boolean ssl = httpCh.getRequest().isSecure();
- if (ssl) {
- SslChannel ch1 = new SslChannel();
- ch1.setSslContext(sslConnector.getSSLContext());
- ch1.setSink(net);
- net.addFilterAfter(ch1);
- net = ch1;
- }
- if (debugHttp) {
- IOChannel ch1 = new DumpChannel("");
- net.addFilterAfter(ch1);
- net = ch1;
- }
-
- if (!net.isOpen()) {
- httpCh.abort("Can't connect");
- return;
- }
- HttpConnection httpStream = newConnection();
- httpStream.setSink(net);
-
- // TODO: add it to the cpool
- httpCh.setConnection(httpStream);
-
- httpStream.sendRequest(httpCh);
- }
-
- public static class HttpConnectionManager {
- public HttpConnection newConnection(HttpConnector con) {
- return new Http11Connection(con);
- }
-
- public HttpConnection getFromPool(RemoteServer t) {
- return t.connections.remove(t.connections.size() - 1);
- }
- }
-
/**
* Actual HTTP/1.1 wire protocol.
*
*/
- public static class HttpConnection extends IOChannel
+ public static abstract class HttpConnection extends IOChannel
implements DataReceivedCallback
{
protected HttpConnector httpConnector;
@@ -422,9 +376,18 @@
protected BBuffer headRecvBuf = BBuffer.allocate(8192);
protected CompressFilter compress = new CompressFilter();
+
+ protected boolean secure = false;
+
+ protected HttpConnectionPool.RemoteServer remoteHost;
+ // If set, the connection is in use ( active )
+ // null == keep alive. Changes synchronized on remoteHost
+ // before/after request
+ protected HttpChannel activeHttp;
@Override
- public void handleReceived(IOChannel ch) throws IOException {
+ public final void handleReceived(IOChannel ch) throws IOException {
+ int before = ch.getIn().available();
dataReceived(ch.getIn());
}
@@ -451,9 +414,7 @@
/**
* Incoming data.
*/
- public void dataReceived(IOBuffer iob) throws IOException {
-
- }
+ public abstract void dataReceived(IOBuffer iob) throws IOException;
/**
* Framing error, client interrupt, etc.
@@ -474,12 +435,12 @@
@Override
public IOBuffer getIn() {
- return net.getIn();
+ return net == null ? null : net.getIn();
}
@Override
public IOBuffer getOut() {
- return net.getOut();
+ return net == null ? null : net.getOut();
}
@Override
@@ -493,188 +454,22 @@
protected void outClosed(HttpChannel http) throws IOException {
}
- protected void endSendReceive(HttpChannel httpChannel) throws IOException {
- return;
- }
-
- public void withExtraBuffer(BBuffer received) {
- return;
- }
-
- }
-
- /**
- * Connections for one remote host.
- * This should't be restricted by IP:port or even hostname,
- * for example if a server has multiple IPs or LB replicas - any would work.
- */
- public static class RemoteServer {
- public ArrayList<HttpConnection> connections = new ArrayList<HttpConnection>();
- }
-
- // TODO: add timeouts, limits per host/total, expire old entries
- // TODO: discover apr and use it
-
- public class ConnectionPool {
- // visible for debugging - will be made private, with accessor
/**
- * Map from client names to socket pools.
+ * Called by HttpChannel when both input and output are fully
+ * sent/received. When this happens the request is no longer associated
+ * with the Connection, and the connection can be re-used.
+ *
+ * The channel can still be used to access the retrieved data that may
+ * still be buffered until HttpChannel.release() is called.
+ *
+ * This method will be called only once, for both succesful and aborted
+ * requests.
*/
- public Map<CharSequence, RemoteServer> hosts = new HashMap<CharSequence,
- RemoteServer>();
-
- // Statistics
- public AtomicInteger waitingSockets = new AtomicInteger();
- public AtomicInteger closedSockets = new AtomicInteger();
-
- public AtomicInteger hits = new AtomicInteger();
- public AtomicInteger misses = new AtomicInteger();
-
- public int getTargetCount() {
- return hosts.size();
- }
-
- public int getSocketCount() {
- return waitingSockets.get();
- }
-
- public int getClosedSockets() {
- return closedSockets.get();
- }
-
- public Set<CharSequence> getKeepAliveTargets() {
- return hosts.keySet();
- }
-
- /**
- * @param key host:port, or some other key if multiple hosts:ips
- * are connected to equivalent servers ( LB )
- * @param httpCh
- * @throws IOException
- */
- public HttpConnection send(HttpChannel httpCh)
- throws IOException {
- String target = httpCh.getTarget();
- HttpConnection con = null;
- // TODO: check ssl on connection - now if a second request
- // is received on a ssl connection - we just send it
- boolean ssl = httpCh.getRequest().isSecure();
-
- RemoteServer t = null;
- synchronized (hosts) {
- t = hosts.get(target);
- if (t == null) {
- misses.incrementAndGet();
- }
- }
- if (t != null) {
- synchronized (t) {
- if (t.connections.size() == 0) {
- misses.incrementAndGet();
- } else {
- con = conManager.getFromPool(t);
-
- if (!con.isOpen()) {
- con.setDataReceivedCallback(null);
- con.close();
- log.fine("Already closed " + con);
- con = null;
- misses.incrementAndGet();
- } else {
- hits.incrementAndGet();
- if (debug) {
- httpCh.trace("HTTP_CONNECT: Reuse connection " + target + " " + this);
- }
- }
- }
- }
- }
-
- if (con == null) {
- if (debug) {
- httpCh.trace("HTTP_CONNECT: New connection " + target);
- }
- String[] hostPort = target.split(":");
-
- int targetPort = ssl ? 443 : 80;
- if (hostPort.length > 1) {
- targetPort = Integer.parseInt(hostPort[1]);
- }
-
- getIOConnector().connect(hostPort[0], targetPort,
- httpCh.connectedCallback);
- } else {
- con.beforeRequest();
- httpCh.setConnection(con);
- con.sendRequest(httpCh);
- }
-
-
- return con;
- }
+ protected abstract void endSendReceive(HttpChannel httpChannel) throws IOException;
- /**
- * Must be called in IOThread for the channel
- */
- public void returnChannel(HttpConnection ch)
- throws IOException {
- CharSequence key = ch.getTarget();
- if (key == null) {
- ch.close();
- if (debug) {
- log.info("Return channel, no target ..." + key + " " + ch);
- }
- return;
- }
-
- if (!ch.isOpen()) {
- ch.close(); // make sure all closed
- if (debug) {
- log.info("Return closed channel ..." + key + " " + ch);
- }
- return;
- }
-
- RemoteServer t = null;
- synchronized (hosts) {
- t = hosts.get(key);
- if (t == null) {
- t = new RemoteServer();
- hosts.put(key, t);
- }
- }
- waitingSockets.incrementAndGet();
-
- ch.ts = System.currentTimeMillis();
- synchronized (t) {
- t.connections.add(ch);
- }
+ public void withExtraBuffer(BBuffer received) {
+ return;
}
- // Called by handleClosed
- void stopKeepAlive(IOChannel schannel) {
- CharSequence target = schannel.getTarget();
- RemoteServer t = null;
- synchronized (hosts) {
- t = hosts.get(target);
- if (t == null) {
- return;
- }
- }
- synchronized (t) {
- if (t.connections.remove(schannel)) {
- waitingSockets.decrementAndGet();
- if (t.connections.size() == 0) {
- hosts.remove(target);
- }
- }
- }
- }
- }
-
- public HttpConnector withConnectionManager(
- HttpConnectionManager connectionManager) {
- this.conManager = connectionManager;
- return this;
}
}
Modified: tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/SpdyConnection.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/SpdyConnection.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/SpdyConnection.java (original)
+++ tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/http/SpdyConnection.java Wed Jan 13 02:07:25 2010
@@ -3,18 +3,26 @@
package org.apache.tomcat.lite.http;
import java.io.IOException;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.tomcat.lite.http.HttpConnector.HttpConnection;
-import org.apache.tomcat.lite.http.HttpConnector.RemoteServer;
+import org.apache.tomcat.lite.http.HttpConnectionPool.RemoteServer;
import org.apache.tomcat.lite.http.HttpMessage.HttpMessageBytes;
import org.apache.tomcat.lite.io.BBucket;
import org.apache.tomcat.lite.io.BBuffer;
import org.apache.tomcat.lite.io.CBuffer;
+import org.apache.tomcat.lite.io.DumpChannel;
import org.apache.tomcat.lite.io.IOBuffer;
+import org.apache.tomcat.lite.io.IOChannel;
+import org.apache.tomcat.lite.io.IOConnector;
+import org.apache.tomcat.lite.io.SslChannel;
/*
* TODO: expectations ?
@@ -27,23 +35,9 @@
* http://localhost:8802/hello
*/
-public class SpdyConnection extends HttpConnector.HttpConnection {
+public class SpdyConnection extends HttpConnector.HttpConnection
+ implements IOConnector.ConnectedCallback {
- public static class SpdyConnectionManager
- extends HttpConnector.HttpConnectionManager {
- @Override
- public HttpConnection newConnection(HttpConnector con) {
- return new SpdyConnection(con);
- }
-
- @Override
- public HttpConnection getFromPool(RemoteServer t) {
- // TODO: we may initiate multiple SPDY connections with each server
- // Sending frames is synchronized, receiving is muxed
- return t.connections.get(0);
- }
-
- }
/** Use compression for headers. Will magically turn to false
* if the first request doesn't have x8xx ( i.e. compress header )
@@ -77,11 +71,16 @@
/**
* @param spdyConnector
+ * @param remoteServer
*/
- SpdyConnection(HttpConnector spdyConnector) {
+ SpdyConnection(HttpConnector spdyConnector, RemoteServer remoteServer) {
this.httpConnector = spdyConnector;
+ this.remoteHost = remoteServer;
+ this.target = remoteServer.target;
}
+ AtomicInteger streamErrors = new AtomicInteger();
+
AtomicInteger lastInStream = new AtomicInteger();
AtomicInteger lastOutStream = new AtomicInteger();
@@ -111,9 +110,12 @@
}
@Override
- public void dataReceived(IOBuffer iob) throws IOException {
- int avail = iob.available();
- while (avail > 0) {
+ public synchronized void dataReceived(IOBuffer iob) throws IOException {
+ while (true) {
+ int avail = iob.available();
+ if (avail == 0) {
+ return;
+ }
if (currentInFrame == null) {
if (inFrameBuffer.remaining() + avail < 8) {
return;
@@ -121,12 +123,11 @@
if (inFrameBuffer.remaining() < 8) {
int headRest = 8 - inFrameBuffer.remaining();
int rd = iob.read(inFrameBuffer, headRest);
- avail -= rd;
}
currentInFrame = new SpdyConnection.Frame(); // TODO: reuse
- currentInFrame.parse(inFrameBuffer);
+ currentInFrame.parse(this, inFrameBuffer);
}
- if (avail < currentInFrame.length) {
+ if (iob.available() < currentInFrame.length) {
return;
}
// We have a full frame. Process it.
@@ -134,11 +135,21 @@
// TODO: extra checks, make sure the frame is correct and
// it consumed all data.
- avail -= currentInFrame.length;
currentInFrame = null;
}
}
+ AtomicInteger inFrames = new AtomicInteger();
+ AtomicInteger inDataFrames = new AtomicInteger();
+ AtomicInteger inSyncStreamFrames = new AtomicInteger();
+ AtomicInteger inBytes = new AtomicInteger();
+
+ AtomicInteger outFrames = new AtomicInteger();
+ AtomicInteger outDataFrames = new AtomicInteger();
+ AtomicInteger outBytes = new AtomicInteger();
+
+
+
/**
* Frame received. Must consume all data for the frame.
*
@@ -148,11 +159,14 @@
protected void onFrame(IOBuffer iob) throws IOException {
// TODO: make sure we have enough data.
lastFrame = currentInFrame;
+ inFrames.incrementAndGet();
+ inBytes.addAndGet(currentInFrame.length + 8);
if (currentInFrame.c) {
if (currentInFrame.type == SpdyConnection.Frame.TYPE_HELO) {
// receivedHello = currentInFrame;
} else if (currentInFrame.type == SpdyConnection.Frame.TYPE_SYN_STREAM) {
+ inSyncStreamFrames.incrementAndGet();
HttpChannel ch = new HttpChannel(); // TODO: reuse
ch.channelId = SpdyConnection.readInt(iob);
ch.setConnection(this);
@@ -164,15 +178,22 @@
ch.setHttpService(this.httpConnector.defaultService);
}
- channels.put(ch.channelId, ch);
-
- // pri and unused
- SpdyConnection.readShort(iob);
-
- HttpMessageBytes reqBytes = ch.getRequest().getMsgBytes();
-
- BBuffer head = processHeaders(iob, ch, reqBytes);
+ synchronized (this) {
+ channels.put(ch.channelId, ch);
+ }
+ try {
+ // pri and unused
+ SpdyConnection.readShort(iob);
+
+ HttpMessageBytes reqBytes = ch.getRequest().getMsgBytes();
+
+ processHeaders(iob, ch, reqBytes);
+ } catch (Throwable t) {
+ log.log(Level.SEVERE, "Error parsing head", t);
+ abort("Error reading headers " + t);
+ return;
+ }
ch.getRequest().processReceivedHeaders();
ch.handleHeadersReceived(ch.getRequest());
@@ -183,14 +204,24 @@
}
} else if (currentInFrame.type == SpdyConnection.Frame.TYPE_SYN_REPLY) {
int chId = SpdyConnection.readInt(iob);
- HttpChannel ch = channels.get(chId);
-
- SpdyConnection.readShort(iob);
+ HttpChannel ch;
+ synchronized (this) {
+ ch = channels.get(chId);
+ if (ch == null) {
+ abort("Channel not found");
+ }
+ }
+ try {
+ SpdyConnection.readShort(iob);
- HttpMessageBytes resBytes = ch.getResponse().getMsgBytes();
+ HttpMessageBytes resBytes = ch.getResponse().getMsgBytes();
- BBuffer head = processHeaders(iob, ch, resBytes);
-
+ BBuffer head = processHeaders(iob, ch, resBytes);
+ } catch (Throwable t) {
+ log.log(Level.SEVERE, "Error parsing head", t);
+ abort("Error reading headers " + t);
+ return;
+ }
ch.getResponse().processReceivedHeaders();
ch.handleHeadersReceived(ch.getResponse());
@@ -204,8 +235,12 @@
iob.advance(currentInFrame.length);
}
} else {
+ inDataFrames.incrementAndGet();
// data frame - part of an existing stream
- HttpChannel ch = channels.get(currentInFrame.streamId);
+ HttpChannel ch;
+ synchronized (this) {
+ ch = channels.get(currentInFrame.streamId);
+ }
if (ch == null) {
log.warning("Unknown stream ");
net.close();
@@ -215,9 +250,14 @@
int len = currentInFrame.length;
while (len > 0) {
BBucket bb = iob.peekFirst();
+ if (bb == null) {
+ // we should have all data
+ abort("Unexpected short read");
+ return;
+ }
if (len > bb.remaining()) {
ch.getIn().append(bb);
- len += bb.remaining();
+ len -= bb.remaining();
bb.position(bb.limit());
} else {
ch.getIn().append(bb, len);
@@ -228,19 +268,31 @@
ch.sendHandleReceivedCallback();
if ((currentInFrame.flags & SpdyConnection.Frame.FLAG_HALF_CLOSE) != 0) {
- ch.getIn().close();
ch.handleEndReceive();
}
}
firstFrame = false;
}
+
+ /**
+ * On frame error.
+ */
+ private void abort(String msg) throws IOException {
+ streamErrors.incrementAndGet();
+ for (HttpChannel ch : channels.values()) {
+ ch.abort(msg);
+ }
+ close();
+ }
private BBuffer processHeaders(IOBuffer iob, HttpChannel ch,
HttpMessageBytes reqBytes) throws IOException {
- int res = iob.peek() & 0xFF;
int nvCount = 0;
- if (firstFrame && (res & 0x0F) != 8) {
- headerCompression = false;
+ if (firstFrame) {
+ int res = iob.peek() & 0xFF;
+ if ((res & 0x0F) != 8) {
+ headerCompression = false;
+ }
}
headRecvBuf.recycle();
if (headerCompression) {
@@ -257,7 +309,11 @@
} else {
nvCount = readShort(iob);
// 8 = stream Id (4) + pri/unused (2) + nvCount (2)
- iob.read(headRecvBuf, currentInFrame.length - 8);
+ // we know we have enough data
+ int rd = iob.read(headRecvBuf, currentInFrame.length - 8);
+ if (rd != currentInFrame.length - 8) {
+ abort("Unexpected incomplete read");
+ }
}
// Wrapper - so we don't change position in head
headRecvBuf.wrapTo(headW);
@@ -268,6 +324,9 @@
for (int i = 0; i < nvCount; i++) {
int nameLen = SpdyConnection.readShort(headW);
+ if (nameLen > headW.remaining()) {
+ abort("Name too long");
+ }
nameBuf.setBytes(headW.array(), headW.position(),
nameLen);
@@ -300,18 +359,23 @@
}
@Override
- protected void sendRequest(HttpChannel http) throws IOException {
+ protected synchronized void sendRequest(HttpChannel http) throws IOException {
if (serverMode) {
throw new IOException("Only in client mode");
}
-
+ if (!checkConnection(http)) {
+ return;
+ }
MultiMap mimeHeaders = http.getRequest().getMimeHeaders();
BBuffer headBuf = BBuffer.allocate();
SpdyConnection.appendShort(headBuf, mimeHeaders.size() + 3);
serializeMime(mimeHeaders, headBuf);
-
+
+ if (headerCompression) {
+ }
+
// TODO: url - with host prefix , method
// optimize...
SpdyConnection.appendAsciiHead(headBuf, "version");
@@ -332,6 +396,11 @@
out.putByte(0x00);
out.putByte(0x01);
+ CBuffer method = http.getRequest().method();
+ if (method.equals("GET") || method.equals("HEAD")) {
+ http.getOut().close();
+ }
+
if (http.getOut().isAppendClosed()) {
out.putByte(0x01); // closed
} else {
@@ -348,17 +417,32 @@
http.channelId = 2 * lastOutStream.incrementAndGet() + 1;
}
SpdyConnection.appendInt(out, http.channelId);
-
- channels.put(http.channelId, http);
+ http.setConnection(this);
+
+ synchronized (this) {
+ channels.put(http.channelId, http);
+ }
out.putByte(0x00); // no priority
out.putByte(0x00);
sendFrame(out, headBuf);
+ if (http.outMessage.state == HttpMessage.State.HEAD) {
+ http.outMessage.state = HttpMessage.State.BODY_DATA;
+ }
+ if (http.getOut().isAppendClosed()) {
+ http.handleEndSent();
+ }
+
// Any existing data
//sendData(http);
}
+
+
+ public synchronized Collection<HttpChannel> getActives() {
+ return channels.values();
+ }
@Override
protected synchronized void sendResponseHeaders(HttpChannel http) throws IOException {
@@ -415,7 +499,8 @@
// It seems piggibacking data is not allowed
frameHead.putByte(0x00);
- SpdyConnection.append24(frameHead, headBuf.remaining() + 6);
+ int len = headBuf.remaining() + 6;
+ SpdyConnection.append24(frameHead, len);
// // Stream-Id, unused
SpdyConnection.appendInt(frameHead, http.channelId);
@@ -474,11 +559,14 @@
if (net == null) {
return; // unit test
}
+ outBytes.addAndGet(out.remaining());
net.getOut().append(out);
if (headBuf != null) {
net.getOut().append(headBuf);
+ outBytes.addAndGet(headBuf.remaining());
}
net.startSending();
+ outFrames.incrementAndGet();
}
public synchronized void sendDataFrame(IOBuffer out2, int avail,
@@ -497,11 +585,14 @@
// TODO: chunk if too much data ( at least at 24 bits)
SpdyConnection.append24(outFrameBuffer, avail);
+ outBytes.addAndGet(outFrameBuffer.remaining() + avail);
net.getOut().append(outFrameBuffer);
+
if (avail > 0) {
net.getOut().append(out2, avail);
}
net.startSending();
+ outDataFrames.incrementAndGet();
}
static void appendInt(BBuffer headBuf, int length) throws IOException {
@@ -579,9 +670,11 @@
static int FLAG_HALF_CLOSE = 1;
- public void parse(BBuffer iob) throws IOException {
+ public void parse(SpdyConnection spdyConnection,
+ BBuffer iob) throws IOException {
int b0 = iob.read();
if (b0 < 128) {
+ // data frame
c = false;
streamId = b0;
for (int i = 0; i < 3; i++) {
@@ -592,6 +685,10 @@
c = true;
b0 -= 128;
version = ((b0 << 8) | iob.read());
+ if (version != 1) {
+ spdyConnection.abort("Wrong version");
+ return;
+ }
b0 = iob.read();
type = ((b0 << 8) | iob.read());
}
@@ -601,12 +698,23 @@
b0 = iob.read();
length = length << 8 | b0;
}
-
+
iob.recycle();
}
}
+ @Override
+ protected void endSendReceive(HttpChannel http) throws IOException {
+ synchronized (this) {
+ HttpChannel doneHttp = channels.remove(http.channelId);
+ if (doneHttp != http) {
+ log.severe("Error removing " + doneHttp + " " + http);
+ }
+ }
+ httpConnector.cpool.afterRequest(http, this, true);
+ }
+
/**
* Framing error, client interrupt, etc.
*/
@@ -615,5 +723,86 @@
}
+
+ volatile boolean connecting = false;
+ volatile boolean connected = false;
+
+
+ private boolean checkConnection(HttpChannel http) throws IOException {
+ synchronized(this) {
+ if (net == null || !isOpen()) {
+ connected = false;
+ }
+
+ if (!connected) {
+ if (!connecting) {
+ // TODO: secure set at start ?
+ connecting = true;
+ httpConnector.cpool.httpConnect(http,
+ target.toString(),
+ http.getRequest().isSecure(), this);
+ }
+
+ synchronized (remoteHost) {
+ remoteHost.pending.add(http);
+ httpConnector.cpool.queued.incrementAndGet();
+ }
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public void handleConnected(IOChannel net) throws IOException {
+ HttpChannel httpCh = null;
+ if (!net.isOpen()) {
+ while (true) {
+ synchronized (remoteHost) {
+ if (remoteHost.pending.size() == 0) {
+ return;
+ }
+ httpCh = remoteHost.pending.remove();
+ }
+ httpCh.abort("Can't connect");
+ }
+ }
+
+ synchronized (remoteHost) {
+ httpCh = remoteHost.pending.peek();
+ }
+ secure = httpCh.getRequest().isSecure();
+ if (secure) {
+ SslChannel ch1 = new SslChannel();
+ ch1.setSslContext(httpConnector.sslConnector.getSSLContext());
+ ch1.setSink(net);
+ net.addFilterAfter(ch1);
+ net = ch1;
+ }
+
+ if (httpConnector.debugHttp) {
+ IOChannel ch1 = new DumpChannel("");
+ net.addFilterAfter(ch1);
+ net = ch1;
+ }
+
+ setSink(net);
+
+ synchronized(this) {
+ connecting = false;
+ connected = true;
+ }
+
+ while (true) {
+ synchronized (remoteHost) {
+ if (remoteHost.pending.size() == 0) {
+ return;
+ }
+ httpCh = remoteHost.pending.remove();
+ }
+ sendRequest(httpCh);
+ }
+ }
}
\ No newline at end of file
Modified: tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOBuffer.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOBuffer.java (original)
+++ tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOBuffer.java Wed Jan 13 02:07:25 2010
@@ -348,18 +348,25 @@
}
public int read(byte[] buf, int off, int len) throws IOException {
- BBucket bucket = peekFirst();
if (isClosedAndEmpty()) {
return -1;
}
- if (bucket == null) {
- return 0;
+ int rd = 0;
+ while (true) {
+ BBucket bucket = peekFirst();
+ if (bucket == null) {
+ return rd;
+ }
+ int toCopy = Math.min(len, bucket.remaining());
+ System.arraycopy(bucket.array(), bucket.position(),
+ buf, off + rd, toCopy);
+ bucket.position(bucket.position() + toCopy);
+ rd += toCopy;
+ len -= toCopy;
+ if (len == 0) {
+ return rd;
+ }
}
- int toCopy = Math.min(len, bucket.remaining());
- System.arraycopy(bucket.array(), bucket.position(), buf,
- off, toCopy);
- bucket.position(bucket.position() + toCopy);
- return toCopy;
}
Modified: tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOChannel.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOChannel.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOChannel.java (original)
+++ tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOChannel.java Wed Jan 13 02:07:25 2010
@@ -27,7 +27,7 @@
protected IOChannel app;
protected String id;
- protected String target;
+ protected CharSequence target;
protected IOConnector connector;
@@ -136,7 +136,7 @@
shutdownOutput();
// Should it read the buffers ?
- if (getIn().isAppendClosed()) {
+ if (getIn() == null || getIn().isAppendClosed()) {
return;
} else {
getIn().close();
@@ -146,11 +146,13 @@
}
public boolean isOpen() {
- return !getIn().isAppendClosed() && !getOut().isAppendClosed();
+ return getIn() != null &&
+ getOut() != null &&
+ !getIn().isAppendClosed() && !getOut().isAppendClosed();
}
public void shutdownOutput() throws IOException {
- if (getOut().isAppendClosed()) {
+ if (getOut() == null || getOut().isAppendClosed()) {
return;
} else {
getOut().close();
@@ -290,5 +292,8 @@
return target;
}
+ public void setTarget(CharSequence target) {
+ this.target = target;
+ }
}
Modified: tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/service/IOStatus.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/service/IOStatus.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/service/IOStatus.java (original)
+++ tomcat/trunk/modules/tomcat-lite/java/org/apache/tomcat/lite/service/IOStatus.java Wed Jan 13 02:07:25 2010
@@ -3,14 +3,15 @@
package org.apache.tomcat.lite.service;
import java.io.IOException;
-import java.util.Map;
+import java.util.List;
+import org.apache.tomcat.lite.http.HttpConnectionPool;
import org.apache.tomcat.lite.http.HttpRequest;
import org.apache.tomcat.lite.http.HttpResponse;
import org.apache.tomcat.lite.http.HttpWriter;
import org.apache.tomcat.lite.http.HttpChannel.HttpService;
-import org.apache.tomcat.lite.http.HttpConnector.ConnectionPool;
-import org.apache.tomcat.lite.http.HttpConnector.RemoteServer;
+import org.apache.tomcat.lite.http.HttpConnectionPool.RemoteServer;
+import org.apache.tomcat.lite.http.HttpConnector.HttpConnection;
import org.apache.tomcat.lite.io.IOChannel;
/**
@@ -18,30 +19,32 @@
*/
public class IOStatus implements HttpService {
- private ConnectionPool pool;
+ private HttpConnectionPool pool;
- public IOStatus(ConnectionPool pool) {
+ public IOStatus(HttpConnectionPool pool) {
this.pool = pool;
}
@Override
public void service(HttpRequest httpReq, HttpResponse httpRes)
throws IOException {
- ConnectionPool sc = pool;
+ HttpConnectionPool sc = pool;
HttpWriter out = httpRes.getBodyWriter();
httpRes.setContentType("text/plain");
+ // TODO: use JMX/DynamicObject to get all public info
out.println("hosts=" + sc.getTargetCount());
out.println("waiting=" + sc.getSocketCount());
out.println("closed=" + sc.getClosedSockets());
out.println();
- for (Map.Entry<CharSequence, RemoteServer> e: sc.hosts.entrySet()) {
- out.append(e.getKey());
+ for (RemoteServer remote: sc.getServers()) {
+ out.append(remote.target);
out.append("=");
- out.println(Integer.toString(e.getValue().connections.size()));
+ List<HttpConnection> connections = remote.getConnections();
+ out.println(Integer.toString(connections.size()));
- for (IOChannel ch: e.getValue().connections) {
+ for (IOChannel ch: connections) {
out.println(ch.getId() +
" " + ch.toString());
}
Modified: tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/TestMain.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/TestMain.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/TestMain.java (original)
+++ tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/TestMain.java Wed Jan 13 02:07:25 2010
@@ -233,7 +233,7 @@
URL url = new URL(path);
HttpURLConnection connection =
(HttpURLConnection) url.openConnection();
- // connection.setReadTimeout(100000);
+ connection.setReadTimeout(10000);
connection.connect();
int rc = connection.getResponseCode();
InputStream is = connection.getInputStream();
Modified: tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelInMemoryTest.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelInMemoryTest.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelInMemoryTest.java (original)
+++ tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelInMemoryTest.java Wed Jan 13 02:07:25 2010
@@ -27,7 +27,7 @@
/**
* Last http channel created by the connection
*/
- HttpChannel http;
+ volatile HttpChannel http;
// Input/output for the connection
MemoryIOConnector.MemoryIOChannel net = new MemoryIOChannel();
Modified: tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/LiveHttp1Test.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/LiveHttp1Test.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/LiveHttp1Test.java (original)
+++ tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/LiveHttp1Test.java Wed Jan 13 02:07:25 2010
@@ -86,7 +86,7 @@
httpClient.requestURI().set("/chunked/foo");
httpClient.send();
httpClient.readAll(bodyRecvBuffer, to);
- assertTrue(bodyRecvBuffer.toString().indexOf("AAA") >= 0);
+ assertTrue(bodyRecvBuffer.toString(), bodyRecvBuffer.toString().indexOf("AAA") >= 0);
}
// Check waitResponseHead()
Modified: tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/SpdyTest.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/SpdyTest.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/SpdyTest.java (original)
+++ tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/http/SpdyTest.java Wed Jan 13 02:07:25 2010
@@ -9,17 +9,15 @@
import junit.framework.TestCase;
import org.apache.tomcat.lite.TestMain;
+import org.apache.tomcat.lite.http.HttpConnectionPool.RemoteServer;
import org.apache.tomcat.lite.io.IOBuffer;
-import org.apache.tomcat.lite.http.SpdyConnection.SpdyConnectionManager;
public class SpdyTest extends TestCase {
HttpConnector http11Con = TestMain.shared().getClient();
- static HttpConnector spdyCon = DefaultHttpConnector.get()
- .withConnectionManager(new SpdyConnectionManager());
+ static HttpConnector spdyCon = DefaultHttpConnector.get();
- HttpConnector memSpdyCon =
- new HttpConnector(null).withConnectionManager(new SpdyConnectionManager());
+ HttpConnector memSpdyCon = new HttpConnector(null);
public void testClient() throws IOException {
HttpRequest req =
@@ -43,7 +41,7 @@
IOBuffer iob = new IOBuffer();
iob.append(is);
- SpdyConnection con = (SpdyConnection) memSpdyCon.newConnection();
+ SpdyConnection con = new SpdyConnection(memSpdyCon, new RemoteServer());
// By default it has a dispatcher buit-in
con.serverMode = true;
@@ -72,7 +70,7 @@
IOBuffer iob = new IOBuffer();
iob.append(is);
- SpdyConnection con = (SpdyConnection) memSpdyCon.newConnection();
+ SpdyConnection con = new SpdyConnection(memSpdyCon, new RemoteServer());
// By default it has a dispatcher buit-in
con.serverMode = true;
@@ -95,8 +93,8 @@
public void testLargeInt() throws Exception {
IOBuffer iob = new IOBuffer();
- iob.append(0xFF);
- iob.append(0xFF);
+ iob.append(0x80);
+ iob.append(0x01);
iob.append(0xFF);
iob.append(0xFF);
@@ -105,12 +103,34 @@
iob.append(0xFF);
iob.append(0xFF);
- SpdyConnection con = (SpdyConnection) memSpdyCon.newConnection();
+ SpdyConnection con = new SpdyConnection(memSpdyCon, new RemoteServer());
con.dataReceived(iob);
- assertEquals(0x7FFF, con.currentInFrame.version);
+ assertEquals(1, con.currentInFrame.version);
assertEquals(0xFFFF, con.currentInFrame.type);
assertEquals(0xFF, con.currentInFrame.flags);
assertEquals(0xFFFFFF, con.currentInFrame.length);
}
+
+ // Does int parsing works ?
+ public void testBad() throws Exception {
+
+ IOBuffer iob = new IOBuffer();
+ iob.append(0xFF);
+ iob.append(0xFF);
+ iob.append(0xFF);
+ iob.append(0xFF);
+
+ iob.append(0xFF);
+ iob.append(0xFF);
+ iob.append(0xFF);
+ iob.append(0xFF);
+
+ SpdyConnection con = new SpdyConnection(memSpdyCon, new RemoteServer());
+ con.dataReceived(iob);
+
+ assertEquals(1, con.streamErrors.get());
+
+ }
+
}
Modified: tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/load/LiveHttpThreadedTest.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/load/LiveHttpThreadedTest.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/load/LiveHttpThreadedTest.java (original)
+++ tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/lite/load/LiveHttpThreadedTest.java Wed Jan 13 02:07:25 2010
@@ -68,175 +68,184 @@
* it seems there is a bug as well.
*/
public class LiveHttpThreadedTest extends TestCase {
- HttpConnector clientCon = TestMain.shared().getClient();
- ThreadRunner tr;
- static MBeanServer server;
-
- AtomicInteger ok = new AtomicInteger();
- Object lock = new Object();
- int reqCnt;
-
- Map<HttpRequest, HttpRequest> active = new HashMap();
-
- public void test1000Async() throws Exception {
- try {
- asyncRequest(10, 100, false);
- } finally {
- dumpHeap("heapAsync.bin");
- }
-
- }
-
- public void test10000Async() throws Exception {
- try {
- asyncRequest(20, 500, false);
- } finally {
- dumpHeap("heapAsync.bin");
- }
- }
-
- public void xtest1000AsyncSpdy() throws Exception {
- try {
- asyncRequest(10, 20, true);
- } finally {
- dumpHeap("heapAsync.bin");
- }
-
- }
-
- public void xtest10000AsyncSpdy() throws Exception {
- try {
- asyncRequest(20, 500, true);
- } finally {
- dumpHeap("heapAsync.bin");
- }
- }
-
- public void asyncRequest(int thr, int perthr, boolean spdy) throws Exception {
- reqCnt = thr * perthr;
- if (spdy) {
- // TODO: simpler API ( 'allowSpdy', etc ) - after negotiation is impl
- clientCon.withConnectionManager(new SpdyConnection.SpdyConnectionManager());
- }
- long t0 = System.currentTimeMillis();
- tr = new ThreadRunner(thr, perthr) {
- public void makeRequest(int i) throws Exception {
- HttpRequest cstate = clientCon.request("localhost", 8802);
- synchronized (active) {
- active.put(cstate, cstate);
- }
-
- cstate.requestURI().set("/hello");
- cstate.setCompletedCallback(reqCallback);
-
- // Send the request, wait response
- Thread.currentThread().sleep(20);
- cstate.send();
+ HttpConnector clientCon = TestMain.shared().getClient();
+ HttpConnector serverCon = TestMain.shared().getTestServer();
+ ThreadRunner tr;
+ static MBeanServer server;
+
+ AtomicInteger ok = new AtomicInteger();
+ Object lock = new Object();
+ int reqCnt;
+
+ Map<HttpRequest, HttpRequest> active = new HashMap();
+
+ public void tearDown() throws IOException {
+ clientCon.cpool.clear();
+ }
+
+ public void test1000Async() throws Exception {
+ try {
+ asyncRequest(10, 100, false);
+ } finally {
+ dumpHeap("heapAsync.bin");
+ }
+
+ }
+
+ public void test10000Async() throws Exception {
+ try {
+ asyncRequest(20, 500, false);
+ } finally {
+ dumpHeap("heapAsyncBig.bin");
+ }
+ }
+
+ public void test1000AsyncSpdy() throws Exception {
+ try {
+ asyncRequest(10, 100, true);
+ } finally {
+ dumpHeap("heapSpdy1000.bin");
+ }
+
+ }
+
+ public void test10000AsyncSpdy() throws Exception {
+ try {
+ asyncRequest(20, 500, true);
+ } finally {
+ dumpHeap("heapSpdy10000.bin");
+ }
+ }
+
+ public void asyncRequest(int thr, int perthr,
+ final boolean spdy) throws Exception {
+ reqCnt = thr * perthr;
+ long t0 = System.currentTimeMillis();
+ tr = new ThreadRunner(thr, perthr) {
+ public void makeRequest(int i) throws Exception {
+ HttpRequest cstate = clientCon.request("localhost", 8802);
+ synchronized (active) {
+ active.put(cstate, cstate);
+ }
+ if (spdy) {
+ // Magic way to force spdy - will be replaced with
+ // a negotiation.
+ cstate.setProtocol("SPDY/1.0");
+ }
+ cstate.requestURI().set("/hello");
+ cstate.setCompletedCallback(reqCallback);
+ // no body
+ cstate.getBody().close();
+ // Send the request, wait response
+ Thread.currentThread().sleep(20);
+ cstate.send();
}
- };
- tr.run();
- assertEquals(0, tr.errors.get());
- synchronized (lock) {
- lock.wait(reqCnt * 100);
- }
- assertEquals(reqCnt, ok.get());
- System.err.println(reqCnt + " Async requests: " + (System.currentTimeMillis() - t0));
- }
-
- public void testURLRequest() throws Exception {
- urlRequest(10, 100);
- }
-
- public void testURLRequest2() throws Exception {
- urlRequest(20, 500);
-
- }
-
- /**
- * HttpURLConnection client against lite.http server.
- */
- public void urlRequest(int thr, int cnt) throws Exception {
- long t0 = System.currentTimeMillis();
-
-
- try {
- HttpConnector testServer = TestMain.getTestServer();
-
- tr = new ThreadRunner(thr, cnt) {
-
- public void makeRequest(int i) throws Exception {
- try {
- ByteChunk out = new ByteChunk();
- HttpURLConnection con = TestMain.getUrl("http://localhost:8802/hello", out);
- if (con.getResponseCode() != 200) {
- errors.incrementAndGet();
- }
- if (!"Hello world".equals(out.toString())) {
- errors.incrementAndGet();
- System.err.println("bad result " + out);
- }
- } catch(Throwable t) {
- t.printStackTrace();
- errors.incrementAndGet();
- }
- }
- };
- tr.run();
- assertEquals(0, tr.errors.get());
-
- System.err.println(thr + " threads, " + (thr * cnt) + " total blocking URL requests: " +
- (System.currentTimeMillis() - t0));
-
- //assertEquals(testServer., actual)
- } finally {
- dumpHeap("heapURLReq.bin");
- }
- }
-
- // TODO: move to a servlet
- private void dumpHeap(String file) throws InstanceNotFoundException,
- MBeanException, ReflectionException, MalformedObjectNameException {
-
- if (server == null) {
- server = ManagementFactory.getPlatformMBeanServer();
-
- }
- File f1 = new java.io.File(file);
- if (f1.exists()) {
- f1.delete();
- }
- server.invoke(new ObjectName("com.sun.management:type=HotSpotDiagnostic"),
- "dumpHeap",
- new Object[] {file, Boolean.FALSE /* live */},
- new String[] {String.class.getName(), "boolean"});
- }
-
-
- RequestCompleted reqCallback = new RequestCompleted() {
- @Override
- public void handle(HttpChannel data, Object extraData)
- throws IOException {
- String out = data.getIn().copyAll(null).toString();
- if (200 != data.getResponse().getStatus()) {
- System.err.println("Wrong status");
- tr.errors.incrementAndGet();
- }
- if (!"Hello world".equals(out)) {
- tr.errors.incrementAndGet();
- System.err.println("bad result " + out);
- }
- synchronized (active) {
- active.remove(data.getRequest());
- }
- data.release();
- int okres = ok.incrementAndGet();
- if (okres >= reqCnt) {
- synchronized (lock) {
- lock.notify();
+ };
+ tr.run();
+ assertEquals(0, tr.errors.get());
+ synchronized (lock) {
+ if (ok.get() < reqCnt) {
+ lock.wait(reqCnt * 100);
}
}
+ assertEquals(reqCnt, ok.get());
+ System.err.println(reqCnt + " Async requests: " + (System.currentTimeMillis() - t0));
}
- };
-
-
+
+ public void testURLRequest1000() throws Exception {
+ urlRequest(10, 100);
+ }
+
+ public void testURLRequest10000() throws Exception {
+ urlRequest(20, 500);
+
+ }
+
+ /**
+ * HttpURLConnection client against lite.http server.
+ */
+ public void urlRequest(int thr, int cnt) throws Exception {
+ long t0 = System.currentTimeMillis();
+
+
+ try {
+ HttpConnector testServer = TestMain.getTestServer();
+
+ tr = new ThreadRunner(thr, cnt) {
+
+ public void makeRequest(int i) throws Exception {
+ try {
+ ByteChunk out = new ByteChunk();
+ HttpURLConnection con = TestMain.getUrl("http://localhost:8802/hello", out);
+ if (con.getResponseCode() != 200) {
+ errors.incrementAndGet();
+ }
+ if (!"Hello world".equals(out.toString())) {
+ errors.incrementAndGet();
+ System.err.println("bad result " + out);
+ }
+ } catch(Throwable t) {
+ t.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ };
+ tr.run();
+ assertEquals(0, tr.errors.get());
+
+ System.err.println(thr + " threads, " + (thr * cnt) + " total blocking URL requests: " +
+ (System.currentTimeMillis() - t0));
+
+ //assertEquals(testServer., actual)
+ } finally {
+ dumpHeap("heapURLReq.bin");
+ }
+ }
+
+ // TODO: move to a servlet
+ private void dumpHeap(String file) throws InstanceNotFoundException,
+ MBeanException, ReflectionException, MalformedObjectNameException {
+
+ if (server == null) {
+ server = ManagementFactory.getPlatformMBeanServer();
+
+ }
+ File f1 = new java.io.File(file);
+ if (f1.exists()) {
+ f1.delete();
+ }
+ server.invoke(new ObjectName("com.sun.management:type=HotSpotDiagnostic"),
+ "dumpHeap",
+ new Object[] {file, Boolean.FALSE /* live */},
+ new String[] {String.class.getName(), "boolean"});
+ }
+
+
+ RequestCompleted reqCallback = new RequestCompleted() {
+ @Override
+ public void handle(HttpChannel data, Object extraData)
+ throws IOException {
+ String out = data.getIn().copyAll(null).toString();
+ if (200 != data.getResponse().getStatus()) {
+ System.err.println("Wrong status");
+ tr.errors.incrementAndGet();
+ }
+ if (!"Hello world".equals(out)) {
+ tr.errors.incrementAndGet();
+ System.err.println("bad result " + out);
+ }
+ synchronized (active) {
+ active.remove(data.getRequest());
+ }
+ data.release();
+ int okres = ok.incrementAndGet();
+ if (okres >= reqCnt) {
+ synchronized (lock) {
+ lock.notify();
+ }
+ }
+ }
+ };
+
+
}
Modified: tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/test/watchdog/WatchdogClient.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/test/watchdog/WatchdogClient.java?rev=898619&r1=898618&r2=898619&view=diff
==============================================================================
--- tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/test/watchdog/WatchdogClient.java (original)
+++ tomcat/trunk/modules/tomcat-lite/test/org/apache/tomcat/test/watchdog/WatchdogClient.java Wed Jan 13 02:07:25 2010
@@ -33,7 +33,7 @@
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
-public class WatchdogClient implements Test {
+public class WatchdogClient {
protected String goldenDir;
protected String testMatch;
@@ -167,12 +167,10 @@
protected String single;
WatchdogTestCase singleTest;
- @Override
public int countTestCases() {
return 1;
}
- @Override
public void run(TestResult result) {
getSuite();
if (singleTest != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org