You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by dp...@apache.org on 2011/12/07 17:24:05 UTC
svn commit: r1211510 - in /jackrabbit/sandbox/microkernel/src:
main/java/org/apache/jackrabbit/mk/
main/java/org/apache/jackrabbit/mk/client/
main/java/org/apache/jackrabbit/mk/server/
main/java/org/apache/jackrabbit/mk/util/ test/java/org/apache/jackr...
Author: dpfister
Date: Wed Dec 7 16:24:05 2011
New Revision: 1211510
URL: http://svn.apache.org/viewvc?rev=1211510&view=rev
Log:
- make large objects in HTTP communication recyclable
- add MemorySockets (Sockets w/o network)
Added:
jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Request.java
- copied, changed from r1210840, jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Request.java
jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/MemorySockets.java (with props)
Modified:
jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/MicroKernelFactory.java
jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/Client.java
jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/HttpExecutor.java
jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/Request.java
jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/HttpProcessor.java
jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Response.java
jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Server.java
jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/ChunkedInputStream.java
jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/ChunkedOutputStream.java
jackrabbit/sandbox/microkernel/src/test/java/org/apache/jackrabbit/mk/MultiMkTestBase.java
Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/MicroKernelFactory.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/MicroKernelFactory.java?rev=1211510&r1=1211509&r2=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/MicroKernelFactory.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/MicroKernelFactory.java Wed Dec 7 16:24:05 2011
@@ -17,14 +17,11 @@
package org.apache.jackrabbit.mk;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
+
import org.apache.jackrabbit.mk.api.MicroKernel;
import org.apache.jackrabbit.mk.client.Client;
import org.apache.jackrabbit.mk.fs.FileUtils;
import org.apache.jackrabbit.mk.mem.MemoryKernelImpl;
-import org.apache.jackrabbit.mk.server.Server;
import org.apache.jackrabbit.mk.util.ExceptionFactory;
import org.apache.jackrabbit.mk.wrapper.IndexWrapper;
import org.apache.jackrabbit.mk.wrapper.LogWrapper;
@@ -76,27 +73,10 @@ public class MicroKernelFactory {
}
return new MicroKernelImpl(dir);
} else if (url.startsWith("http:")) {
- try {
- URI uri = new URI(url);
- return new Client(new InetSocketAddress(uri.getHost(), uri.getPort()));
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(e.getMessage());
- }
- } else if (url.startsWith("remote:")) {
- try {
- MicroKernel mk = getInstance(url.substring("remote:".length()));
- final Server server = new Server(mk);
- server.setPort(0);
- server.start();
- return new Client(server.getAddress()) {
- public void dispose() {
- super.dispose();
- server.stop();
- }
- };
- } catch (IOException e) {
- throw new IllegalArgumentException(e.getMessage());
- }
+ return Client.createHttpClient(url);
+ } else if (url.startsWith("http-bridge:")) {
+ MicroKernel mk = MicroKernelFactory.getInstance(url.substring("http-bridge:".length()));
+ return Client.createHttpBridge(mk);
} else {
throw new IllegalArgumentException(url);
}
Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/Client.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/Client.java?rev=1211510&r1=1211509&r2=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/Client.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/Client.java Wed Dec 7 16:24:05 2011
@@ -20,14 +20,23 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.SocketFactory;
import org.apache.jackrabbit.mk.api.MicroKernel;
import org.apache.jackrabbit.mk.api.MicroKernelException;
+import org.apache.jackrabbit.mk.server.Server;
import org.apache.jackrabbit.mk.util.IOUtils;
/**
* Client exposing a <code>MicroKernel</code> interface, that "remotes" commands
* to a server.
+ * <p/>
+ * All public methods inside this class are completely synchronized because
+ * HttpExecutor is not thread-safe.
*/
public class Client implements MicroKernel {
@@ -35,9 +44,51 @@ public class Client implements MicroKern
private final InetSocketAddress addr;
+ private final SocketFactory socketFactory;
+
+ private final AtomicBoolean disposed = new AtomicBoolean();
+
private HttpExecutor executor;
- private boolean disposed;
+ /**
+ * Create a new instance of this class, given a URL to connect to.
+ *
+ * @param url url
+ * @return micro kernel
+ */
+ public static MicroKernel createHttpClient(String url) {
+ try {
+ URI uri = new URI(url);
+ return new Client(new InetSocketAddress(uri.getHost(), uri.getPort()));
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+
+ /**
+ * Create a new instance of this class, where every request goes through an HTTP bridge
+ * before being delivered to a given micro kernel implementation.
+ *
+ * @param mk micro kernel
+ * @return bridged micro kernel
+ */
+ public static MicroKernel createHttpBridge(MicroKernel mk) {
+ final Server server = new Server(mk);
+
+ try {
+ server.start();
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+
+ return new Client(server.getAddress()) {
+ @Override
+ public synchronized void dispose() {
+ super.dispose();
+ server.stop();
+ }
+ };
+ }
/**
* Create a new instance of this class.
@@ -45,20 +96,26 @@ public class Client implements MicroKern
* @param addr socket address
*/
public Client(InetSocketAddress addr) {
- this.addr = addr;
+ this(addr, SocketFactory.getDefault());
}
+ /**
+ * Create a new instance of this class.
+ *
+ * @param addr socket address
+ */
+ public Client(InetSocketAddress addr, SocketFactory socketFactory) {
+ this.addr = addr;
+ this.socketFactory = socketFactory;
+ }
+
//-------------------------------------------------- implements MicroKernel
public synchronized void dispose() {
- if (disposed) {
+ if (!disposed.compareAndSet(false, true)) {
return;
}
- try {
- IOUtils.closeQuietly(executor);
- } finally {
- disposed = true;
- }
+ IOUtils.closeQuietly(executor);
}
public synchronized String getHeadRevision() throws MicroKernelException {
@@ -125,8 +182,7 @@ public class Client implements MicroKern
}
}
- public synchronized String diff(String fromRevisionId, String toRevisionId,
- String path)
+ public synchronized String diff(String fromRevisionId, String toRevisionId, String path)
throws MicroKernelException {
Request request = null;
@@ -186,8 +242,8 @@ public class Client implements MicroKern
}
}
- public synchronized String commit(String path, String jsonDiff, String revisionId, String message)
- throws MicroKernelException {
+ public synchronized String commit(String path, String jsonDiff, String revisionId,
+ String message) throws MicroKernelException {
Request request = null;
@@ -275,7 +331,7 @@ public class Client implements MicroKern
* @throws MicroKernelException if an exception occurs
*/
private Request createRequest(String command) throws IOException, MicroKernelException {
- if (disposed) {
+ if (disposed.get()) {
throw new IllegalStateException("This instance has already been disposed");
}
if (executor != null && !executor.isAlive()) {
@@ -289,6 +345,9 @@ public class Client implements MicroKern
}
private Socket createSocket() throws IOException {
- return new Socket(addr.getAddress(), addr.getPort());
+ if (addr == null) {
+ return socketFactory.createSocket();
+ }
+ return socketFactory.createSocket(addr.getAddress(), addr.getPort());
}
}
Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/HttpExecutor.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/HttpExecutor.java?rev=1211510&r1=1211509&r2=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/HttpExecutor.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/HttpExecutor.java Wed Dec 7 16:24:05 2011
@@ -37,6 +37,10 @@ import org.apache.jackrabbit.mk.util.IOU
/**
* Executes commands as HTTP requests.
+ * <p>
+ * This class is NOT thread-safe: its execute() method should operate within a
+ * lock, which must be held if a result input stream is returned UNTIL this
+ * stream is consumed or closed.
*/
class HttpExecutor implements Closeable {
@@ -46,8 +50,12 @@ class HttpExecutor implements Closeable
private OutputStream socketOut;
+ private final ChunkedOutputStream bodyOut = new ChunkedOutputStream(null);
+
+ private final ChunkedInputStream bodyIn = new ChunkedInputStream(null);
+
private boolean connectionClosed;
-
+
/**
* Create a new instance of this class.
*
@@ -84,7 +92,7 @@ class HttpExecutor implements Closeable
writeLine("Transfer-Encoding: chunked");
writeLine("");
- OutputStream bodyOut = new ChunkedOutputStream(socketOut);
+ bodyOut.recycle(socketOut);
if (in != null) {
String boundary = getBoundary();
@@ -143,7 +151,8 @@ class HttpExecutor implements Closeable
String encoding = headers.get("Transfer-Encoding");
if ("chunked".equalsIgnoreCase(encoding)) {
- reqIn = new ChunkedInputStream(socketIn);
+ bodyIn.recycle(socketIn);
+ reqIn = bodyIn;
} else {
int contentLength = -1;
@@ -181,10 +190,18 @@ class HttpExecutor implements Closeable
}
}
+ /**
+ * Return a flag indicating whether the executor is alive.
+ *
+ * @return <code>true</code> if it is alive; <code>false</code> otherwise
+ */
public boolean isAlive() {
return !connectionClosed && !socket.isClosed();
}
+ /**
+ * Close this executor.
+ */
public void close() {
IOUtils.closeQuietly(socketOut);
IOUtils.closeQuietly(socketIn);
Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/Request.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/Request.java?rev=1211510&r1=1211509&r2=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/Request.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/Request.java Wed Dec 7 16:24:05 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jackrabbit.mk.util.IOUtils;
@@ -41,7 +42,7 @@ class Request implements Closeable {
private InputStream resultIn;
- private boolean executed;
+ private final AtomicBoolean executed = new AtomicBoolean();
/**
* Create a new instance of this class.
@@ -104,14 +105,10 @@ class Request implements Closeable {
* @throws IOException if an I/O error occurs
*/
public void execute() throws IOException {
- if (executed) {
+ if (!executed.compareAndSet(false, true)) {
return;
}
- try {
- resultIn = executor.execute(command, params, in);
- } finally {
- executed = true;
- }
+ resultIn = executor.execute(command, params, in);
}
/**
Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/HttpProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/HttpProcessor.java?rev=1211510&r1=1211509&r2=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/HttpProcessor.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/HttpProcessor.java Wed Dec 7 16:24:05 2011
@@ -34,7 +34,7 @@ class HttpProcessor {
private static final int DEFAULT_SO_TIMEOUT = 30000;
- private static final int MAX_KEEP_ALIVE_REQUESTS = 30;
+ private static final int MAX_KEEP_ALIVE_REQUESTS = 100;
private final Socket socket;
@@ -43,6 +43,10 @@ class HttpProcessor {
private InputStream socketIn;
private OutputStream socketOut;
+
+ private final Request request = new Request();
+
+ private final Response response = new Response();
/**
* Create a new instance of this class.
@@ -92,11 +96,8 @@ class HttpProcessor {
* @throws IOException if an I/O error occurs
*/
private boolean process(int requestNum) throws IOException {
- Request request = null;
- Response response = null;
-
try {
- request = Request.parse(socketIn);
+ request.parse(socketIn);
} catch (IOException e) {
if (requestNum == 0) {
// ignore errors on the very first request (might be wrong protocol)
@@ -107,7 +108,7 @@ class HttpProcessor {
try {
boolean keepAlive = request.isKeepAlive() &&
(requestNum + 1 < MAX_KEEP_ALIVE_REQUESTS);
- response = new Response(socketOut, keepAlive);
+ response.recycle(socketOut, keepAlive);
servlet.service(request, response);
return keepAlive;
} finally {
Copied: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Request.java (from r1210840, jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Request.java)
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Request.java?p2=jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Request.java&p1=jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Request.java&r1=1210840&r2=1211510&rev=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Request.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Request.java Wed Dec 7 16:24:05 2011
@@ -22,7 +22,6 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URLDecoder;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -39,58 +38,54 @@ class Request implements Closeable {
private InputStream in;
- private final String method;
+ private String method;
- private final String file;
-
- private final String protocol;
+ private String file;
private String queryString;
- private final Map<String,String> headers;
+ private String protocol;
+
+ private final Map<String,String> headers = new LinkedHashMap<String,String>();
- private Map<String, String> params;
+ private boolean paramsChecked;
+ private final Map<String, String> params = new LinkedHashMap<String,String>();
+
+ private final ChunkedInputStream chunkedIn = new ChunkedInputStream(null);
+
private InputStream reqIn;
/**
- * Create a new instance of this class.
+ * Parse a request. This automatically resets any internal state, so it can be
+ * used multiple times
*
- * @param method HTTP method
- * @param uri target URI
- * @param headers request headers
- * @param in request body
+ * @param in input stream
+ * @throws IOException if an I/O error occurs
*/
- private Request(String method, String uri, String protocol,
- Map<String,String> headers, InputStream in) {
- this.method = method;
+ void parse(InputStream in) throws IOException {
+ String requestLine = readLine(in);
+
+ String[] parts = requestLine.split(" ");
+ if (parts.length != 3) {
+ String msg = String.format("Bad HTTP request line: %s", requestLine);
+ throw new IOException(msg);
+ }
+ method = parts[0];
+ String uri = parts[1];
int index = uri.lastIndexOf('?');
if (index == -1) {
file = uri;
+ queryString = null;
} else {
file = uri.substring(0, index);
queryString = uri.substring(index + 1);
}
- this.protocol = protocol;
- this.headers = headers;
- this.in = in;
- }
-
- public static Request parse(InputStream in) throws IOException {
- String requestLine = readLine(in);
-
- String[] parts = requestLine.split(" ");
- if (parts.length != 3) {
- String msg = String.format("Bad HTTP request line: %s", requestLine);
- throw new IOException(msg);
- }
- String method = parts[0];
- String uri = parts[1];
- String protocol = parts[2];
+ protocol = parts[2];
- Map<String, String> headers = new LinkedHashMap<String, String>();
+ headers.clear();
for (;;) {
String headerLine = readLine(in);
@@ -102,7 +97,12 @@ class Request implements Closeable {
headers.put(parts[0].trim(), parts[1].trim());
}
}
- return new Request(method, uri, protocol, headers, in);
+
+ params.clear();
+ paramsChecked = false;
+ reqIn = null;
+
+ this.in = in;
}
/**
@@ -170,14 +170,16 @@ class Request implements Closeable {
}
public String getParameter(String name) throws IOException {
- if (params == null) {
- params = new HashMap<String, String>();
-
- String contentType = getContentType();
- if ("application/x-www-form-urlencoded".equals(contentType)) {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- IOUtils.copy(getInputStream(), out);
- collectParameters(out.toString(), params);
+ if (!paramsChecked) {
+ try {
+ String contentType = getContentType();
+ if ("application/x-www-form-urlencoded".equals(contentType)) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ IOUtils.copy(getInputStream(), out);
+ collectParameters(out.toString(), params);
+ }
+ } finally {
+ paramsChecked = true;
}
}
return params.get(name);
@@ -251,7 +253,8 @@ class Request implements Closeable {
if (reqIn == null) {
String encoding = headers.get("Transfer-Encoding");
if ("chunked".equalsIgnoreCase(encoding)) {
- reqIn = new ChunkedInputStream(in);
+ chunkedIn.recycle(in);
+ reqIn = chunkedIn;
} else {
int contentLength = getContentLength();
if (contentLength == -1) {
@@ -269,11 +272,14 @@ class Request implements Closeable {
public void close() {
if (in != null) {
- // Consume a possibly non-empty body by triggering the
- // creation of our request input stream
- getInputStream();
- IOUtils.closeQuietly(reqIn);
- in = null;
+ try {
+ // Consume a possibly non-empty body by triggering the
+ // creation of our request input stream
+ getInputStream();
+ IOUtils.closeQuietly(reqIn);
+ } finally {
+ in = null;
+ }
}
}
}
Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Response.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Response.java?rev=1211510&r1=1211509&r2=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Response.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Response.java Wed Dec 7 16:24:05 2011
@@ -24,6 +24,8 @@ import java.util.Map;
import org.apache.jackrabbit.mk.util.IOUtils;
+import static org.apache.jackrabbit.mk.util.ChunkedInputStream.MAX_CHUNK_SIZE;
+
/**
* HTTP Response implementation.
*/
@@ -43,19 +45,28 @@ class Response implements Closeable {
private String contentType;
- private BodyOutputStream respOut;
+ private final BodyOutputStream bodyOut = new BodyOutputStream();
+
+ private OutputStream respOut;
- private Map<String,String> headers;
+ private final Map<String,String> headers = new LinkedHashMap<String, String>();
/**
- * Create a new instance of this class.
+ * Recycle this instance, using another output stream and a keep-alive flag.
*
* @param out output stream
* @param keepAlive whether to keep alive the connection
*/
- public Response(OutputStream out, boolean keepAlive) {
+ void recycle(OutputStream out, boolean keepAlive) {
this.out = out;
this.keepAlive = keepAlive;
+
+ headersSent = committed = chunked = false;
+ statusCode = 0;
+ contentType = null;
+ bodyOut.reset();
+ respOut = null;
+ headers.clear();
}
/**
@@ -106,13 +117,11 @@ class Response implements Closeable {
writeLine(String.format("HTTP/1.1 %d %s", statusCode, msg));
- if (respOut != null) {
- if (committed) {
- writeLine(String.format("Content-Length: %d", respOut.getCount()));
- } else {
- chunked = true;
- writeLine("Transfer-Encoding: chunked");
- }
+ if (committed) {
+ writeLine(String.format("Content-Length: %d", bodyOut.getCount()));
+ } else {
+ chunked = true;
+ writeLine("Transfer-Encoding: chunked");
}
if (contentType != null) {
writeLine(String.format("Content-Type: %s", contentType));
@@ -185,7 +194,7 @@ class Response implements Closeable {
public OutputStream getOutputStream() {
if (respOut == null) {
- respOut = new BodyOutputStream();
+ respOut = bodyOut;
}
return respOut;
}
@@ -195,9 +204,6 @@ class Response implements Closeable {
}
public void addHeader(String name, String value) {
- if (headers == null) {
- headers = new LinkedHashMap<String, String>();
- }
headers.put(name, value);
}
@@ -214,7 +220,7 @@ class Response implements Closeable {
* Buffer size chosen intentionally to not exceed maximum chunk
* size we'd like to transmit.
*/
- private final byte[] buf = new byte[0xFFFF];
+ private final byte[] buf = new byte[MAX_CHUNK_SIZE];
private int offset;
@@ -259,6 +265,10 @@ class Response implements Closeable {
}
}
+ public void reset() {
+ offset = 0;
+ }
+
@Override
public void close() throws IOException {
flush();
Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Server.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Server.java?rev=1211510&r1=1211509&r2=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Server.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Server.java Wed Dec 7 16:24:05 2011
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.mk.server;
import java.io.EOFException;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
@@ -27,6 +28,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import javax.net.ServerSocketFactory;
+
import org.apache.jackrabbit.mk.MicroKernelFactory;
import org.apache.jackrabbit.mk.api.MicroKernel;
@@ -35,6 +38,11 @@ import org.apache.jackrabbit.mk.api.Micr
*/
public class Server {
+ /** java.net.ServerSocket's default backlog size. */
+ private static final int BACKLOG = 50;
+
+ private final ServerSocketFactory ssFactory;
+
private AtomicReference<MicroKernel> mkref;
private AtomicBoolean started = new AtomicBoolean();
@@ -45,7 +53,9 @@ public class Server {
private ExecutorService es;
- private int port = 28080;
+ private int port;
+
+ private InetAddress addr;
/**
* Create a new instance of this class.
@@ -53,10 +63,21 @@ public class Server {
* @param mk micro kernel
*/
public Server(MicroKernel mk) {
+ this(mk, ServerSocketFactory.getDefault());
this.mkref = new AtomicReference<MicroKernel>(mk);
}
/**
+ * Create a new instance of this class.
+ *
+ * @param mk micro kernel
+ */
+ public Server(MicroKernel mk, ServerSocketFactory ssFactory) {
+ this.mkref = new AtomicReference<MicroKernel>(mk);
+ this.ssFactory = ssFactory;
+ }
+
+ /**
* Set port number to listen to.
*
* @param port port numbern
@@ -70,6 +91,16 @@ public class Server {
}
/**
+ * Set bind address.
+ */
+ public void setBindAddress(InetAddress addr) throws IllegalStateException {
+ if (started.get()) {
+ throw new IllegalStateException("Server already started.");
+ }
+ this.addr = addr;
+ }
+
+ /**
* Start this server.
*
* @throws IOException if an I/O error occurs
@@ -94,7 +125,7 @@ public class Server {
final Socket socket = ss.accept();
es.execute(new Runnable() {
public void run() {
- handle(socket);
+ process(socket);
}
});
}
@@ -104,7 +135,7 @@ public class Server {
}
private ServerSocket createServerSocket() throws IOException {
- return new ServerSocket(port);
+ return ssFactory.createServerSocket(port, BACKLOG, addr);
}
private ExecutorService createExecutorService() {
@@ -112,18 +143,24 @@ public class Server {
}
/**
- * Handle a connection attempt by a client.
+ * Process a connection attempt by a client.
*
* @param socket client socket
*/
- void handle(Socket socket) {
+ void process(Socket socket) {
try {
socket.setTcpNoDelay(true);
} catch (IOException e) {
/* ignore */
}
- HttpProcessor processor = new HttpProcessor(socket, new ServletImpl());
+ HttpProcessor processor = new HttpProcessor(socket, new Servlet() {
+ @Override
+ public void service(Request request, Response response)
+ throws IOException {
+ Server.this.service(request, response);
+ }
+ });
try {
processor.process();
@@ -136,27 +173,31 @@ public class Server {
}
}
- public InetSocketAddress getAddress() {
- if (!started.get() || stopped.get()) {
- return null;
+ /**
+ * Service a request.
+ *
+ * @param request request
+ * @param response response
+ * @throws IOException if an I/O error occurs
+ */
+ void service(Request request, Response response) throws IOException {
+ if (request.getMethod().equals("POST")) {
+ MicroKernelServlet.INSTANCE.service(mkref.get(), request, response);
+ } else {
+ FileServlet.INSTANCE.service(request, response);
}
- return (InetSocketAddress) ss.getLocalSocketAddress();
}
/**
- * Internal servlet that handles all requests to this server.
+ * Return the server's local socket address.
+ *
+ * @return socket address or <code>null</code> if the server is not started
*/
- class ServletImpl implements Servlet {
-
- public void service(Request request, Response response)
- throws IOException {
-
- if (request.getMethod().equals("POST")) {
- MicroKernelServlet.INSTANCE.service(mkref.get(), request, response);
- } else {
- FileServlet.INSTANCE.service(request, response);
- }
+ public InetSocketAddress getAddress() {
+ if (!started.get() || stopped.get()) {
+ return null;
}
+ return (InetSocketAddress) ss.getLocalSocketAddress();
}
/**
@@ -184,7 +225,7 @@ public class Server {
public static void main(String[] args) throws Exception {
if (args.length == 0) {
- System.out.println(String.format("usage: %s microkernel-url [port]",
+ System.out.println(String.format("usage: %s microkernel-url [port] [bindaddr]",
Server.class.getName()));
return;
}
@@ -194,6 +235,11 @@ public class Server {
final Server server = new Server(mk);
if (args.length >= 2) {
server.setPort(Integer.parseInt(args[1]));
+ } else {
+ server.setPort(28080);
+ }
+ if (args.length >= 3) {
+ server.setBindAddress(InetAddress.getByName(args[2]));
}
server.start();
Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/ChunkedInputStream.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/ChunkedInputStream.java?rev=1211510&r1=1211509&r2=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/ChunkedInputStream.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/ChunkedInputStream.java Wed Dec 7 16:24:05 2011
@@ -24,7 +24,7 @@ import java.util.Arrays;
/**
* Input stream that reads and decodes HTTP chunks, assuming that no chunk
- * exceeds 65535 bytes and that a chunk's length is represented by exactly 4
+ * exceeds 32768 bytes and that a chunk's length is represented by exactly 4
* hexadecimal characters.
*/
public class ChunkedInputStream extends FilterInputStream {
@@ -32,7 +32,7 @@ public class ChunkedInputStream extends
/**
* Maximum chunk size.
*/
- private static final int MAX_CHUNK_SIZE = 65535;
+ public static final int MAX_CHUNK_SIZE = 0x8000;
/**
* CR + LF combination.
@@ -183,6 +183,18 @@ public class ChunkedInputStream extends
}
/**
+ * Recycle this input stream.
+ *
+ * @param out new underlying input stream
+ */
+ public void recycle(InputStream in) {
+ this.in = in;
+
+ offset = length = 0;
+ lastChunk = false;
+ }
+
+ /**
* Close this input stream. Finishes reading any pending chunks until
* the last chunk is received. Does <b>not</b> close the underlying input
* stream.
Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/ChunkedOutputStream.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/ChunkedOutputStream.java?rev=1211510&r1=1211509&r2=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/ChunkedOutputStream.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/ChunkedOutputStream.java Wed Dec 7 16:24:05 2011
@@ -17,20 +17,18 @@
package org.apache.jackrabbit.mk.util;
import java.io.FilterOutputStream;
+
import java.io.IOException;
import java.io.OutputStream;
+import static org.apache.jackrabbit.mk.util.ChunkedInputStream.MAX_CHUNK_SIZE;
+
/**
* Output stream that encodes and writes HTTP chunks.
*/
public class ChunkedOutputStream extends FilterOutputStream {
/**
- * Maximum chunk size.
- */
- private static final int MAX_CHUNK_SIZE = 65535;
-
- /**
* CR + LF combination.
*/
private static final byte[] CRLF = "\r\n".getBytes();
@@ -156,11 +154,22 @@ public class ChunkedOutputStream extends
}
super.flush();
}
+
+ /**
+ * Recycle this output stream.
+ *
+ * @param out new underlying output stream
+ */
+ public void recycle(OutputStream out) {
+ this.out = out;
+ offset = 0;
+ }
/**
* Close this output stream. Flush the contents of the internal buffer
- * and writes the last chunk to the underlying output stream. Does
- * <b>not</b> close the underlying output stream.
+ * and writes the last chunk to the underlying output stream. Sets
+ * the internal reference to the underlying output stream to
+ * <code>null</code>. Does <b>not</b> close the underlying output stream.
*
* @see java.io.FilterOutputStream#close()
*/
Added: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/MemorySockets.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/MemorySockets.java?rev=1211510&view=auto
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/MemorySockets.java (added)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/MemorySockets.java Wed Dec 7 16:24:05 2011
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.mk.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+
+/**
+ * Memory sockets.
+ */
+public abstract class MemorySockets {
+
+ /** Sockets queue */
+ static final BlockingQueue<Socket> QUEUE = new LinkedBlockingQueue<Socket>();
+
+ /** Sentinel socket, used to signal a closed queue */
+ static final Socket SENTINEL = new Socket();
+
+ /**
+ * Return the server socket factory.
+ *
+ * @return server socket factory
+ */
+ public static ServerSocketFactory getServerSocketFactory() {
+ return new ServerSocketFactory() {
+ @Override
+ public ServerSocket createServerSocket() throws IOException {
+ return new ServerSocket() {
+ /** Closed flag */
+ private final AtomicBoolean closed = new AtomicBoolean();
+
+ @Override
+ public Socket accept() throws IOException {
+ if (closed.get()) {
+ throw new IOException("closed");
+ }
+ try {
+ Socket socket = QUEUE.take();
+ if (socket == SENTINEL) {
+ throw new IOException("closed");
+ }
+ return socket;
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed.compareAndSet(false, true)) {
+ QUEUE.add(SENTINEL);
+ }
+ }
+ };
+ }
+
+ @Override
+ public ServerSocket createServerSocket(int port) throws IOException {
+ return createServerSocket();
+ }
+
+ @Override
+ public ServerSocket createServerSocket(int port, int backlog)
+ throws IOException {
+
+ return createServerSocket();
+ }
+
+ @Override
+ public ServerSocket createServerSocket(int port, int backlog,
+ InetAddress ifAddress) throws IOException {
+
+ return createServerSocket();
+ }
+ };
+ }
+
+ /**
+ * Return the socket factory.
+ *
+ * @return socket factory
+ */
+ public static SocketFactory getSocketFactory() {
+ return new SocketFactory() {
+ @Override
+ public Socket createSocket() throws IOException {
+ PipedSocket socket = new PipedSocket();
+ QUEUE.add(new PipedSocket(socket));
+ return socket;
+ }
+
+ @Override
+ public Socket createSocket(InetAddress host, int port) throws IOException {
+ return createSocket();
+ }
+
+ @Override
+ public Socket createSocket(String host, int port) throws IOException,
+ UnknownHostException {
+
+ return createSocket();
+ }
+
+ @Override
+ public Socket createSocket(String host, int port, InetAddress localHost,
+ int localPort) throws IOException, UnknownHostException {
+
+ return createSocket();
+ }
+
+ @Override
+ public Socket createSocket(InetAddress address, int port,
+ InetAddress localAddress, int localPort) throws IOException {
+
+ return createSocket();
+ }
+ };
+ };
+
+ /**
+ * Socket implementation, using pipes to exchange information between a
+ * pair of sockets.
+ */
+ static class PipedSocket extends Socket {
+
+ /** Input stream */
+ protected final PipedInputStream in;
+
+ /** Output stream */
+ protected final PipedOutputStream out;
+
+ /**
+ * Used to initialize the socket on the client side.
+ */
+ PipedSocket() {
+ in = new PipedInputStream(8192);
+ out = new PipedOutputStream();
+ }
+
+ /**
+ * Used to initialize the socket on the server side.
+ */
+ PipedSocket(PipedSocket client) throws IOException {
+ in = new PipedInputStream(client.out);
+ out = new PipedOutputStream(client.in);
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return in;
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ return out;
+ }
+ }
+}
Propchange: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/MemorySockets.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/MemorySockets.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision Rev Url
Modified: jackrabbit/sandbox/microkernel/src/test/java/org/apache/jackrabbit/mk/MultiMkTestBase.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/test/java/org/apache/jackrabbit/mk/MultiMkTestBase.java?rev=1211510&r1=1211509&r2=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/test/java/org/apache/jackrabbit/mk/MultiMkTestBase.java (original)
+++ jackrabbit/sandbox/microkernel/src/test/java/org/apache/jackrabbit/mk/MultiMkTestBase.java Wed Dec 7 16:24:05 2011
@@ -38,7 +38,7 @@ public class MultiMkTestBase {
{"fs:{homeDir}/target"},
{"mem:"},
{"mem:fs:target/temp"},
- {"remote:fs:{homeDir}/target"}
+ {"http-bridge:fs:{homeDir}/target"}
});
}