You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by dp...@apache.org on 2011/12/07 17:24:05 UTC

svn commit: r1211510 - in /jackrabbit/sandbox/microkernel/src: main/java/org/apache/jackrabbit/mk/ main/java/org/apache/jackrabbit/mk/client/ main/java/org/apache/jackrabbit/mk/server/ main/java/org/apache/jackrabbit/mk/util/ test/java/org/apache/jackr...

Author: dpfister
Date: Wed Dec  7 16:24:05 2011
New Revision: 1211510

URL: http://svn.apache.org/viewvc?rev=1211510&view=rev
Log:
- make large objects in HTTP communication recyclable
- add MemorySockets (Sockets w/o network)

Added:
    jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Request.java
      - copied, changed from r1210840, jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Request.java
    jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/MemorySockets.java   (with props)
Modified:
    jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/MicroKernelFactory.java
    jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/Client.java
    jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/HttpExecutor.java
    jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/Request.java
    jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/HttpProcessor.java
    jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Response.java
    jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Server.java
    jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/ChunkedInputStream.java
    jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/ChunkedOutputStream.java
    jackrabbit/sandbox/microkernel/src/test/java/org/apache/jackrabbit/mk/MultiMkTestBase.java

Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/MicroKernelFactory.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/MicroKernelFactory.java?rev=1211510&r1=1211509&r2=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/MicroKernelFactory.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/MicroKernelFactory.java Wed Dec  7 16:24:05 2011
@@ -17,14 +17,11 @@
 package org.apache.jackrabbit.mk;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
+
 import org.apache.jackrabbit.mk.api.MicroKernel;
 import org.apache.jackrabbit.mk.client.Client;
 import org.apache.jackrabbit.mk.fs.FileUtils;
 import org.apache.jackrabbit.mk.mem.MemoryKernelImpl;
-import org.apache.jackrabbit.mk.server.Server;
 import org.apache.jackrabbit.mk.util.ExceptionFactory;
 import org.apache.jackrabbit.mk.wrapper.IndexWrapper;
 import org.apache.jackrabbit.mk.wrapper.LogWrapper;
@@ -76,27 +73,10 @@ public class MicroKernelFactory {
             }
             return new MicroKernelImpl(dir);
         } else if (url.startsWith("http:")) {
-            try {
-                URI uri = new URI(url);
-                return new Client(new InetSocketAddress(uri.getHost(), uri.getPort()));
-            } catch (URISyntaxException e) {
-                throw new IllegalArgumentException(e.getMessage());
-            }
-        } else if (url.startsWith("remote:")) {
-            try {
-                MicroKernel mk = getInstance(url.substring("remote:".length()));
-                final Server server = new Server(mk);
-                server.setPort(0);
-                server.start();
-                return new Client(server.getAddress()) {
-                    public void dispose() {
-                        super.dispose();
-                        server.stop();
-                    }
-                };
-            } catch (IOException e) {
-                throw new IllegalArgumentException(e.getMessage());
-            }
+            return Client.createHttpClient(url);
+        } else if (url.startsWith("http-bridge:")) {
+            MicroKernel mk = MicroKernelFactory.getInstance(url.substring("http-bridge:".length()));
+            return Client.createHttpBridge(mk);
         } else {
             throw new IllegalArgumentException(url);
         }

Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/Client.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/Client.java?rev=1211510&r1=1211509&r2=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/Client.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/Client.java Wed Dec  7 16:24:05 2011
@@ -20,14 +20,23 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.SocketFactory;
 
 import org.apache.jackrabbit.mk.api.MicroKernel;
 import org.apache.jackrabbit.mk.api.MicroKernelException;
+import org.apache.jackrabbit.mk.server.Server;
 import org.apache.jackrabbit.mk.util.IOUtils;
 
 /**
  * Client exposing a <code>MicroKernel</code> interface, that "remotes" commands
  * to a server.
+ * <p/>
+ * All public methods inside this class are completely synchronized because
+ * HttpExecutor is not thread-safe.
  */
 public class Client implements MicroKernel {
     
@@ -35,9 +44,51 @@ public class Client implements MicroKern
     
     private final InetSocketAddress addr;
     
+    private final SocketFactory socketFactory;
+
+    private final AtomicBoolean disposed = new AtomicBoolean();
+    
     private HttpExecutor executor;
     
-    private boolean disposed;
+    /**
+     * Create a new instance of this class, given a URL to connect to.
+     * 
+     * @param url url
+     * @return micro kernel
+     */
+    public static MicroKernel createHttpClient(String url) {
+        try {
+            URI uri = new URI(url);
+            return new Client(new InetSocketAddress(uri.getHost(), uri.getPort()));
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException(e.getMessage());
+        }
+    }
+    
+    /**
+     * Create a new instance of this class, where every request goes through an HTTP bridge
+     * before being delivered to a given micro kernel implementation.
+     * 
+     * @param mk micro kernel 
+     * @return bridged micro kernel
+     */
+    public static MicroKernel createHttpBridge(MicroKernel mk) {
+        final Server server = new Server(mk);
+        
+        try {
+            server.start();
+        } catch (IOException e) {
+            throw new IllegalArgumentException(e.getMessage());
+        }
+
+        return new Client(server.getAddress()) {
+            @Override
+            public synchronized void dispose() {
+                super.dispose();
+                server.stop();
+            }
+        };
+    }
     
     /**
      * Create a new instance of this class.
@@ -45,20 +96,26 @@ public class Client implements MicroKern
      * @param addr socket address
      */
     public Client(InetSocketAddress addr) {
-        this.addr = addr;
+        this(addr, SocketFactory.getDefault());
     }
 
+    /**
+     * Create a new instance of this class.
+     * 
+     * @param addr socket address
+     */
+    public Client(InetSocketAddress addr, SocketFactory socketFactory) {
+        this.addr = addr;
+        this.socketFactory = socketFactory;
+    }
+    
     //-------------------------------------------------- implements MicroKernel
     
     public synchronized void dispose() {
-        if (disposed) {
+        if (!disposed.compareAndSet(false, true)) {
             return;
         }
-        try {
-            IOUtils.closeQuietly(executor);
-        } finally {
-            disposed = true;
-        }
+        IOUtils.closeQuietly(executor);
     }
 
     public synchronized String getHeadRevision() throws MicroKernelException {
@@ -125,8 +182,7 @@ public class Client implements MicroKern
         }
     }
 
-    public synchronized String diff(String fromRevisionId, String toRevisionId,
-                       String path)
+    public synchronized String diff(String fromRevisionId, String toRevisionId, String path)
             throws MicroKernelException {
         Request request = null;
 
@@ -186,8 +242,8 @@ public class Client implements MicroKern
         }
     }
 
-    public synchronized String commit(String path, String jsonDiff, String revisionId, String message)
-            throws MicroKernelException {
+    public synchronized String commit(String path, String jsonDiff, String revisionId, 
+            String message) throws MicroKernelException {
         
         Request request = null;
 
@@ -275,7 +331,7 @@ public class Client implements MicroKern
      * @throws MicroKernelException if an exception occurs
      */
     private Request createRequest(String command) throws IOException, MicroKernelException {
-        if (disposed) {
+        if (disposed.get()) {
             throw new IllegalStateException("This instance has already been disposed");
         }
         if (executor != null && !executor.isAlive()) {
@@ -289,6 +345,9 @@ public class Client implements MicroKern
     }
     
     private Socket createSocket() throws IOException {
-        return new Socket(addr.getAddress(), addr.getPort());
+        if (addr == null) {
+            return socketFactory.createSocket();
+        }
+        return socketFactory.createSocket(addr.getAddress(), addr.getPort());
     }
 }

Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/HttpExecutor.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/HttpExecutor.java?rev=1211510&r1=1211509&r2=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/HttpExecutor.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/HttpExecutor.java Wed Dec  7 16:24:05 2011
@@ -37,6 +37,10 @@ import org.apache.jackrabbit.mk.util.IOU
 
 /**
  * Executes commands as HTTP requests.
+ * <p>
+ * This class is NOT thread-safe: its execute() method should operate within a
+ * lock, which must be held if a result input stream is returned UNTIL this
+ * stream is consumed or closed.
  */
 class HttpExecutor implements Closeable {
 
@@ -46,8 +50,12 @@ class HttpExecutor implements Closeable 
     
     private OutputStream socketOut;
     
+    private final ChunkedOutputStream bodyOut = new ChunkedOutputStream(null);
+    
+    private final ChunkedInputStream bodyIn = new ChunkedInputStream(null); 
+    
     private boolean connectionClosed;
-
+    
     /**
      * Create a new instance of this class.
      * 
@@ -84,7 +92,7 @@ class HttpExecutor implements Closeable 
         writeLine("Transfer-Encoding: chunked");
         writeLine("");
         
-        OutputStream bodyOut = new ChunkedOutputStream(socketOut);
+        bodyOut.recycle(socketOut);
         
         if (in != null) {
             String boundary = getBoundary();
@@ -143,7 +151,8 @@ class HttpExecutor implements Closeable 
         
         String encoding = headers.get("Transfer-Encoding");
         if ("chunked".equalsIgnoreCase(encoding)) {
-            reqIn = new ChunkedInputStream(socketIn);
+            bodyIn.recycle(socketIn);
+            reqIn = bodyIn;
         } else {
             int contentLength = -1;
             
@@ -181,10 +190,18 @@ class HttpExecutor implements Closeable 
         }
     }
     
+    /**
+     * Return a flag indicating whether the executor is alive.
+     * 
+     * @return <code>true</code> if it is alive; <code>false</code> otherwise
+     */
     public boolean isAlive() {
         return !connectionClosed && !socket.isClosed();
     }
     
+    /**
+     * Close this executor.
+     */
     public void close() {
         IOUtils.closeQuietly(socketOut);
         IOUtils.closeQuietly(socketIn);

Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/Request.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/Request.java?rev=1211510&r1=1211509&r2=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/Request.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/client/Request.java Wed Dec  7 16:24:05 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.jackrabbit.mk.util.IOUtils;
 
@@ -41,7 +42,7 @@ class Request implements Closeable {
     
     private InputStream resultIn;
     
-    private boolean executed;
+    private final AtomicBoolean executed = new AtomicBoolean();
     
     /**
      * Create a new instance of this class.
@@ -104,14 +105,10 @@ class Request implements Closeable {
      * @throws IOException if an I/O error occurs
      */
     public void execute() throws IOException {
-        if (executed) {
+        if (!executed.compareAndSet(false, true)) {
             return;
         }
-        try {
-            resultIn = executor.execute(command, params, in);
-        } finally {
-            executed = true;
-        }
+        resultIn = executor.execute(command, params, in);
     }
     
     /**

Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/HttpProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/HttpProcessor.java?rev=1211510&r1=1211509&r2=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/HttpProcessor.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/HttpProcessor.java Wed Dec  7 16:24:05 2011
@@ -34,7 +34,7 @@ class HttpProcessor {
     
     private static final int DEFAULT_SO_TIMEOUT = 30000;
     
-    private static final int MAX_KEEP_ALIVE_REQUESTS = 30;
+    private static final int MAX_KEEP_ALIVE_REQUESTS = 100;
 
     private final Socket socket;
     
@@ -43,6 +43,10 @@ class HttpProcessor {
     private InputStream socketIn;
     
     private OutputStream socketOut;
+    
+    private final Request request = new Request(); 
+
+    private final Response response = new Response(); 
 
     /**
      * Create a new instance of this class.
@@ -92,11 +96,8 @@ class HttpProcessor {
      * @throws IOException if an I/O error occurs
      */
     private boolean process(int requestNum) throws IOException {
-        Request request = null;
-        Response response = null;
-        
         try {
-            request = Request.parse(socketIn);
+            request.parse(socketIn);
         } catch (IOException e) {
             if (requestNum == 0) {
                 // ignore errors on the very first request (might be wrong protocol)
@@ -107,7 +108,7 @@ class HttpProcessor {
         try {
             boolean keepAlive = request.isKeepAlive() &&
                     (requestNum + 1 < MAX_KEEP_ALIVE_REQUESTS);
-            response = new Response(socketOut, keepAlive);
+            response.recycle(socketOut, keepAlive);
             servlet.service(request, response);
             return keepAlive;
         } finally {

Copied: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Request.java (from r1210840, jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Request.java)
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Request.java?p2=jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Request.java&p1=jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Request.java&r1=1210840&r2=1211510&rev=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Request.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Request.java Wed Dec  7 16:24:05 2011
@@ -22,7 +22,6 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URLDecoder;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -39,58 +38,54 @@ class Request implements Closeable {
 
     private InputStream in;
     
-    private final String method;
+    private String method;
     
-    private final String file;
-    
-    private final String protocol;
+    private String file;
     
     private String queryString;
     
-    private final Map<String,String> headers;
+    private String protocol;
+    
+    private final Map<String,String> headers = new LinkedHashMap<String,String>();
     
-    private Map<String, String> params;
+    private boolean paramsChecked;
     
+    private final Map<String, String> params = new LinkedHashMap<String,String>();
+    
+    private final ChunkedInputStream chunkedIn = new ChunkedInputStream(null);
+
     private InputStream reqIn;
     
     /**
-     * Create a new instance of this class.
+     * Parse a request. This automatically resets any internal state, so it can be
+     * used multiple times
      * 
-     * @param method HTTP method
-     * @param uri target URI
-     * @param headers request headers
-     * @param in request body
+     * @param in input stream
+     * @throws IOException if an I/O error occurs
      */
-    private Request(String method, String uri, String protocol,
-            Map<String,String> headers, InputStream in) {
-        this.method = method;
+    void parse(InputStream in) throws IOException {
+        String requestLine = readLine(in);
+        
+        String[] parts = requestLine.split(" ");
+        if (parts.length != 3) {
+            String msg = String.format("Bad HTTP request line: %s", requestLine);
+            throw new IOException(msg);
+        }
+        method = parts[0];
         
+        String uri = parts[1];
         int index = uri.lastIndexOf('?');
         if (index == -1) {
             file = uri;
+            queryString = null;
         } else {
             file = uri.substring(0, index);
             queryString = uri.substring(index + 1);
         }
         
-        this.protocol = protocol;
-        this.headers = headers;
-        this.in = in;
-    }
-    
-    public static Request parse(InputStream in) throws IOException {
-        String requestLine = readLine(in);
-        
-        String[] parts = requestLine.split(" ");
-        if (parts.length != 3) {
-            String msg = String.format("Bad HTTP request line: %s", requestLine);
-            throw new IOException(msg);
-        }
-        String method = parts[0];
-        String uri = parts[1];
-        String protocol = parts[2];
+        protocol = parts[2];
         
-        Map<String, String> headers = new LinkedHashMap<String, String>();
+        headers.clear();
         
         for (;;) {
             String headerLine = readLine(in);
@@ -102,7 +97,12 @@ class Request implements Closeable {
                 headers.put(parts[0].trim(), parts[1].trim());
             }
         }
-        return new Request(method, uri, protocol, headers, in);
+        
+        params.clear();
+        paramsChecked = false;
+        reqIn = null;
+        
+        this.in = in;
     }
     
     /**
@@ -170,14 +170,16 @@ class Request implements Closeable {
     }
 
     public String getParameter(String name) throws IOException {
-        if (params == null) {
-            params = new HashMap<String, String>();
-            
-            String contentType = getContentType();
-            if ("application/x-www-form-urlencoded".equals(contentType)) {
-                ByteArrayOutputStream out = new ByteArrayOutputStream();
-                IOUtils.copy(getInputStream(), out);
-                collectParameters(out.toString(), params);
+        if (!paramsChecked) {
+            try {
+                String contentType = getContentType();
+                if ("application/x-www-form-urlencoded".equals(contentType)) {
+                    ByteArrayOutputStream out = new ByteArrayOutputStream();
+                    IOUtils.copy(getInputStream(), out);
+                    collectParameters(out.toString(), params);
+                }
+            } finally {
+                paramsChecked = true;
             }
         }
         return params.get(name);
@@ -251,7 +253,8 @@ class Request implements Closeable {
         if (reqIn == null) {
             String encoding = headers.get("Transfer-Encoding");
             if ("chunked".equalsIgnoreCase(encoding)) {
-                reqIn = new ChunkedInputStream(in);
+                chunkedIn.recycle(in);
+                reqIn = chunkedIn;
             } else {
                 int contentLength = getContentLength();
                 if (contentLength == -1) {
@@ -269,11 +272,14 @@ class Request implements Closeable {
     
     public void close() {
         if (in != null) {
-            // Consume a possibly non-empty body by triggering the
-            // creation of our request input stream
-            getInputStream();
-            IOUtils.closeQuietly(reqIn);            
-            in = null;
+            try {
+                // Consume a possibly non-empty body by triggering the
+                // creation of our request input stream
+                getInputStream();
+                IOUtils.closeQuietly(reqIn);
+            } finally {
+                in = null;
+            }
         }
     }
 }

Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Response.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Response.java?rev=1211510&r1=1211509&r2=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Response.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Response.java Wed Dec  7 16:24:05 2011
@@ -24,6 +24,8 @@ import java.util.Map;
 
 import org.apache.jackrabbit.mk.util.IOUtils;
 
+import static org.apache.jackrabbit.mk.util.ChunkedInputStream.MAX_CHUNK_SIZE;
+
 /**
  * HTTP Response implementation.
  */
@@ -43,19 +45,28 @@ class Response implements Closeable {
     
     private String contentType;
     
-    private BodyOutputStream respOut;
+    private final BodyOutputStream bodyOut = new BodyOutputStream();
+    
+    private OutputStream respOut;
     
-    private Map<String,String> headers;
+    private final Map<String,String> headers = new LinkedHashMap<String, String>();
     
     /**
-     * Create a new instance of this class.
+     * Recycle this instance, using another output stream and a keep-alive flag.
      * 
      * @param out output stream
      * @param keepAlive whether to keep alive the connection
      */
-    public Response(OutputStream out, boolean keepAlive) {
+    void recycle(OutputStream out, boolean keepAlive) {
         this.out = out;
         this.keepAlive = keepAlive;
+
+        headersSent = committed = chunked = false;
+        statusCode = 0;
+        contentType = null;
+        bodyOut.reset();
+        respOut = null;
+        headers.clear();
     }
     
     /**
@@ -106,13 +117,11 @@ class Response implements Closeable {
         
         writeLine(String.format("HTTP/1.1 %d %s", statusCode, msg));
         
-        if (respOut != null) {
-            if (committed) {
-                writeLine(String.format("Content-Length: %d", respOut.getCount()));
-            } else {
-                chunked = true;
-                writeLine("Transfer-Encoding: chunked");
-            }
+        if (committed) {
+            writeLine(String.format("Content-Length: %d", bodyOut.getCount()));
+        } else {
+            chunked = true;
+            writeLine("Transfer-Encoding: chunked");
         }
         if (contentType != null) {
             writeLine(String.format("Content-Type: %s", contentType));
@@ -185,7 +194,7 @@ class Response implements Closeable {
 
     public OutputStream getOutputStream() {
         if (respOut == null) {
-            respOut = new BodyOutputStream();
+            respOut = bodyOut;
         }
         return respOut;
     }
@@ -195,9 +204,6 @@ class Response implements Closeable {
     }
     
     public void addHeader(String name, String value) {
-        if (headers == null) {
-            headers = new LinkedHashMap<String, String>();
-        }
         headers.put(name, value);
     }
     
@@ -214,7 +220,7 @@ class Response implements Closeable {
          * Buffer size chosen intentionally to not exceed maximum chunk
          * size we'd like to transmit.
          */
-        private final byte[] buf = new byte[0xFFFF];
+        private final byte[] buf = new byte[MAX_CHUNK_SIZE];
         
         private int offset;
 
@@ -259,6 +265,10 @@ class Response implements Closeable {
             }
         }
         
+        public void reset() {
+            offset = 0;
+        }
+        
         @Override
         public void close() throws IOException {
             flush();

Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Server.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Server.java?rev=1211510&r1=1211509&r2=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Server.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/server/Server.java Wed Dec  7 16:24:05 2011
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.mk.server;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
@@ -27,6 +28,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import javax.net.ServerSocketFactory;
+
 import org.apache.jackrabbit.mk.MicroKernelFactory;
 import org.apache.jackrabbit.mk.api.MicroKernel;
 
@@ -35,6 +38,11 @@ import org.apache.jackrabbit.mk.api.Micr
  */
 public class Server {
 
+    /** java.net.ServerSocket's default backlog size. */
+    private static final int BACKLOG = 50;
+    
+    private final ServerSocketFactory ssFactory;
+    
     private AtomicReference<MicroKernel> mkref;
     
     private AtomicBoolean started = new AtomicBoolean();
@@ -45,7 +53,9 @@ public class Server {
     
     private ExecutorService es;
     
-    private int port = 28080;
+    private int port;
+    
+    private InetAddress addr;
     
     /**
      * Create a new instance of this class.
@@ -53,10 +63,21 @@ public class Server {
      * @param mk micro kernel
      */
     public Server(MicroKernel mk) {
+        this(mk, ServerSocketFactory.getDefault());
         this.mkref = new AtomicReference<MicroKernel>(mk);
     }
 
     /**
+     * Create a new instance of this class.
+     * 
+     * @param mk micro kernel
+     */
+    public Server(MicroKernel mk, ServerSocketFactory ssFactory) {
+        this.mkref = new AtomicReference<MicroKernel>(mk);
+        this.ssFactory = ssFactory;
+    }
+    
+    /**
      * Set port number to listen to.
      * 
      * @param port port numbern
@@ -70,6 +91,16 @@ public class Server {
     }
     
     /**
+     * Set bind address.
+     */
+    public void setBindAddress(InetAddress addr) throws IllegalStateException {
+        if (started.get()) {
+            throw new IllegalStateException("Server already started.");
+        }
+        this.addr = addr;
+    }
+    
+    /**
      * Start this server.
      * 
      * @throws IOException if an I/O error occurs
@@ -94,7 +125,7 @@ public class Server {
                 final Socket socket = ss.accept();
                 es.execute(new Runnable() {
                     public void run() {
-                        handle(socket);
+                        process(socket);
                     }
                 });
             }
@@ -104,7 +135,7 @@ public class Server {
     }
     
     private ServerSocket createServerSocket() throws IOException {
-        return new ServerSocket(port);
+        return ssFactory.createServerSocket(port, BACKLOG, addr);
     }
     
     private ExecutorService createExecutorService() {
@@ -112,18 +143,24 @@ public class Server {
     }
     
     /**
-     * Handle a connection attempt by a client.
+     * Process a connection attempt by a client.
      * 
      * @param socket client socket
      */
-    void handle(Socket socket) {
+    void process(Socket socket) {
         try {
             socket.setTcpNoDelay(true);
         } catch (IOException e) {
             /* ignore */
         }
 
-        HttpProcessor processor = new HttpProcessor(socket, new ServletImpl());
+        HttpProcessor processor = new HttpProcessor(socket, new Servlet() {
+            @Override
+            public void service(Request request, Response response)
+                    throws IOException {
+                Server.this.service(request, response);
+            }
+        });
 
         try {
             processor.process();
@@ -136,27 +173,31 @@ public class Server {
         }
     }
     
-    public InetSocketAddress getAddress() {
-        if (!started.get() || stopped.get()) {
-            return null;
+    /**
+     * Service a request.
+     * 
+     * @param request request
+     * @param response response
+     * @throws IOException if an I/O error occurs
+     */
+    void service(Request request, Response response) throws IOException {
+        if (request.getMethod().equals("POST")) {
+            MicroKernelServlet.INSTANCE.service(mkref.get(), request, response);
+        } else {
+            FileServlet.INSTANCE.service(request, response);
         }
-        return (InetSocketAddress) ss.getLocalSocketAddress();
     }
     
     /**
-     * Internal servlet that handles all requests to this server.
+     * Return the server's local socket address.
+     * 
+     * @return socket address or <code>null</code> if the server is not started
      */
-    class ServletImpl implements Servlet {
-
-        public void service(Request request, Response response)
-                throws IOException {
-
-            if (request.getMethod().equals("POST")) {
-                MicroKernelServlet.INSTANCE.service(mkref.get(), request, response);
-            } else {
-                FileServlet.INSTANCE.service(request, response);
-            }
+    public InetSocketAddress getAddress() {
+        if (!started.get() || stopped.get()) {
+            return null;
         }
+        return (InetSocketAddress) ss.getLocalSocketAddress();
     }
     
     /**
@@ -184,7 +225,7 @@ public class Server {
     
     public static void main(String[] args) throws Exception {
         if (args.length == 0) {
-            System.out.println(String.format("usage: %s microkernel-url [port]", 
+            System.out.println(String.format("usage: %s microkernel-url [port] [bindaddr]", 
                     Server.class.getName()));
             return;
         }
@@ -194,6 +235,11 @@ public class Server {
         final Server server = new Server(mk);
         if (args.length >= 2) {
             server.setPort(Integer.parseInt(args[1]));
+        } else {
+            server.setPort(28080);
+        }
+        if (args.length >= 3) {
+            server.setBindAddress(InetAddress.getByName(args[2]));
         }
         server.start();
         

Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/ChunkedInputStream.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/ChunkedInputStream.java?rev=1211510&r1=1211509&r2=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/ChunkedInputStream.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/ChunkedInputStream.java Wed Dec  7 16:24:05 2011
@@ -24,7 +24,7 @@ import java.util.Arrays;
 
 /**
  * Input stream that reads and decodes HTTP chunks, assuming that no chunk
- * exceeds 65535 bytes and that a chunk's length is represented by exactly 4
+ * exceeds 32768 bytes and that a chunk's length is represented by exactly 4
  * hexadecimal characters.
  */
 public class ChunkedInputStream extends FilterInputStream {
@@ -32,7 +32,7 @@ public class ChunkedInputStream extends 
     /**
      * Maximum chunk size.
      */
-    private static final int MAX_CHUNK_SIZE = 65535;
+    public static final int MAX_CHUNK_SIZE = 0x8000;
 
     /**
      * CR + LF combination.
@@ -183,6 +183,18 @@ public class ChunkedInputStream extends 
     }
 
     /**
+     * Recycle this input stream.
+     * 
+     * @param out new underlying input stream
+     */
+    public void recycle(InputStream in) {
+        this.in = in;
+        
+        offset = length = 0;
+        lastChunk = false;
+    }
+    
+    /**
      * Close this input stream. Finishes reading any pending chunks until
      * the last chunk is received. Does <b>not</b> close the underlying input
      * stream.

Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/ChunkedOutputStream.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/ChunkedOutputStream.java?rev=1211510&r1=1211509&r2=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/ChunkedOutputStream.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/ChunkedOutputStream.java Wed Dec  7 16:24:05 2011
@@ -17,20 +17,18 @@
 package org.apache.jackrabbit.mk.util;
 
 import java.io.FilterOutputStream;
+
 import java.io.IOException;
 import java.io.OutputStream;
 
+import static org.apache.jackrabbit.mk.util.ChunkedInputStream.MAX_CHUNK_SIZE;
+
 /**
  * Output stream that encodes and writes HTTP chunks.
  */
 public class ChunkedOutputStream extends FilterOutputStream {
 
     /**
-     * Maximum chunk size.
-     */
-    private static final int MAX_CHUNK_SIZE = 65535;
-
-    /**
      * CR + LF combination.
      */
     private static final byte[] CRLF = "\r\n".getBytes();
@@ -156,11 +154,22 @@ public class ChunkedOutputStream extends
         }
         super.flush();
     }
+    
+    /**
+     * Recycle this output stream.
+     * 
+     * @param out new underlying output stream
+     */
+    public void recycle(OutputStream out) {
+        this.out = out;
+        offset = 0;
+    }
 
     /**
      * Close this output stream. Flush the contents of the internal buffer
-     * and writes the last chunk to the underlying output stream. Does
-     * <b>not</b> close the underlying output stream.
+     * and writes the last chunk to the underlying output stream. Sets
+     * the internal reference to the underlying output stream to 
+     * <code>null</code>. Does <b>not</b> close the underlying output stream.
      *
      * @see java.io.FilterOutputStream#close()
      */

Added: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/MemorySockets.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/MemorySockets.java?rev=1211510&view=auto
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/MemorySockets.java (added)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/MemorySockets.java Wed Dec  7 16:24:05 2011
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.mk.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+
+/**
+ * Memory sockets.
+ */
+public abstract class MemorySockets {
+
+    /** Sockets queue */
+    static final BlockingQueue<Socket> QUEUE = new LinkedBlockingQueue<Socket>();
+    
+    /** Sentinel socket, used to signal a closed queue */
+    static final Socket SENTINEL = new Socket();
+    
+    /**
+     * Return the server socket factory.
+     * 
+     * @return server socket factory
+     */
+    public static ServerSocketFactory getServerSocketFactory() {
+        return new ServerSocketFactory() {
+            @Override
+            public ServerSocket createServerSocket() throws IOException {
+                return new ServerSocket() {
+                    /** Closed flag */
+                    private final AtomicBoolean closed = new AtomicBoolean();
+                    
+                    @Override
+                    public Socket accept() throws IOException {
+                        if (closed.get()) {
+                            throw new IOException("closed");
+                        }
+                        try {
+                            Socket socket = QUEUE.take();
+                            if (socket == SENTINEL) {
+                                throw new IOException("closed");
+                            }
+                            return socket;
+                        } catch (InterruptedException e) {
+                            throw new InterruptedIOException();
+                        }
+                    }
+                    
+                    @Override
+                    public void close() throws IOException {
+                        if (closed.compareAndSet(false, true)) {
+                            QUEUE.add(SENTINEL);
+                        }
+                    }
+                };
+            }
+    
+            @Override
+            public ServerSocket createServerSocket(int port) throws IOException {
+                return createServerSocket();
+            }
+            
+            @Override
+            public ServerSocket createServerSocket(int port, int backlog)
+                    throws IOException {
+                
+                return createServerSocket();
+            }
+    
+            @Override
+            public ServerSocket createServerSocket(int port, int backlog,
+                    InetAddress ifAddress) throws IOException {
+    
+                return createServerSocket();
+            }
+        };        
+    }
+    
+    /**
+     * Return the socket factory.
+     * 
+     * @return socket factory
+     */
+    public static SocketFactory getSocketFactory() {
+        return new SocketFactory() {
+            @Override
+            public Socket createSocket() throws IOException {
+                PipedSocket socket = new PipedSocket();
+                QUEUE.add(new PipedSocket(socket));
+                return socket;
+            }
+            
+            @Override
+            public Socket createSocket(InetAddress host, int port) throws IOException {
+                return createSocket();
+            }
+
+            @Override
+            public Socket createSocket(String host, int port) throws IOException,
+                    UnknownHostException {
+                
+                return createSocket();
+            }
+
+            @Override
+            public Socket createSocket(String host, int port, InetAddress localHost,
+                    int localPort) throws IOException, UnknownHostException {
+
+                return createSocket();
+            }
+
+            @Override
+            public Socket createSocket(InetAddress address, int port,
+                    InetAddress localAddress, int localPort) throws IOException {
+
+                return createSocket();
+            }
+        };
+    };
+    
+    /**
+     * Socket implementation, using pipes to exchange information between a
+     * pair of sockets.
+     */
+    static class PipedSocket extends Socket {
+
+        /** Input stream */
+        protected final PipedInputStream in;
+
+        /** Output stream */
+        protected final PipedOutputStream out;
+        
+        /**
+         * Used to initialize the socket on the client side.
+         */
+        PipedSocket() {
+            in = new PipedInputStream(8192);
+            out = new PipedOutputStream();
+        }
+        
+        /**
+         * Used to initialize the socket on the server side.
+         */
+        PipedSocket(PipedSocket client) throws IOException {
+             in = new PipedInputStream(client.out);
+             out = new PipedOutputStream(client.in);
+        }
+
+        @Override
+        public InputStream getInputStream() throws IOException {
+            return in;
+        }
+        
+        @Override
+        public OutputStream getOutputStream() throws IOException {
+            return out;
+        }
+    }
+}

Propchange: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/MemorySockets.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/util/MemorySockets.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev Url

Modified: jackrabbit/sandbox/microkernel/src/test/java/org/apache/jackrabbit/mk/MultiMkTestBase.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/test/java/org/apache/jackrabbit/mk/MultiMkTestBase.java?rev=1211510&r1=1211509&r2=1211510&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/test/java/org/apache/jackrabbit/mk/MultiMkTestBase.java (original)
+++ jackrabbit/sandbox/microkernel/src/test/java/org/apache/jackrabbit/mk/MultiMkTestBase.java Wed Dec  7 16:24:05 2011
@@ -38,7 +38,7 @@ public class MultiMkTestBase {
                     {"fs:{homeDir}/target"},
                     {"mem:"},
                     {"mem:fs:target/temp"},
-                    {"remote:fs:{homeDir}/target"}
+                    {"http-bridge:fs:{homeDir}/target"}
                     });
     }