You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2012/04/28 23:09:03 UTC

svn commit: r1331832 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk: client/Client.java client/HttpExecutor.java client/Request.java server/MicroKernelServlet.java server/Server.java

Author: jukka
Date: Sat Apr 28 21:09:03 2012
New Revision: 1331832

URL: http://svn.apache.org/viewvc?rev=1331832&view=rev
Log:
OAK-78: waitForCommit() test failure for MK remoting

Allow concurrent use of Client and HttpExecutor.
Fix parameter name mismatch for remoting waitForCommit.

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/Client.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/HttpExecutor.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/Request.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/server/MicroKernelServlet.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/server/Server.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/Client.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/Client.java?rev=1331832&r1=1331831&r2=1331832&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/Client.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/Client.java Sat Apr 28 21:09:03 2012
@@ -96,15 +96,12 @@ public class Client implements MicroKern
     //-------------------------------------------------- implements MicroKernel
     
     @Override
-    public synchronized void dispose() {
-        if (!disposed.compareAndSet(false, true)) {
-            return;
-        }
-        IOUtils.closeQuietly(executor);
+    public void dispose() {
+	// do nothing
     }
 
     @Override
-    public synchronized String getHeadRevision() throws MicroKernelException {
+    public String getHeadRevision() throws MicroKernelException {
         Request request = null;
         
         try {
@@ -118,7 +115,7 @@ public class Client implements MicroKern
     }
 
     @Override
-    public synchronized String getRevisionHistory(long since, int maxEntries)
+    public String getRevisionHistory(long since, int maxEntries)
             throws MicroKernelException {
 
         Request request = null;
@@ -136,7 +133,7 @@ public class Client implements MicroKern
     }
 
     @Override
-    public synchronized String waitForCommit(String oldHeadRevisionId, long maxWaitMillis)
+    public String waitForCommit(String oldHeadRevisionId, long maxWaitMillis)
             throws MicroKernelException, InterruptedException {
 
         Request request = null;
@@ -154,7 +151,7 @@ public class Client implements MicroKern
     }
 
     @Override
-    public synchronized String getJournal(String fromRevisionId, String toRevisionId, String filter)
+    public String getJournal(String fromRevisionId, String toRevisionId, String filter)
             throws MicroKernelException {
         
         Request request = null;
@@ -173,7 +170,7 @@ public class Client implements MicroKern
     }
 
     @Override
-    public synchronized String diff(String fromRevisionId, String toRevisionId, String filter)
+    public String diff(String fromRevisionId, String toRevisionId, String filter)
             throws MicroKernelException {
         Request request = null;
 
@@ -191,7 +188,7 @@ public class Client implements MicroKern
     }
 
     @Override
-    public synchronized boolean nodeExists(String path, String revisionId)
+    public boolean nodeExists(String path, String revisionId)
             throws MicroKernelException {
 
         Request request = null;
@@ -209,7 +206,7 @@ public class Client implements MicroKern
     }
 
     @Override
-    public synchronized long getChildNodeCount(String path, String revisionId)
+    public long getChildNodeCount(String path, String revisionId)
             throws MicroKernelException {
 
         Request request = null;
@@ -227,14 +224,14 @@ public class Client implements MicroKern
     }
 
     @Override
-    public synchronized String getNodes(String path, String revisionId)
+    public String getNodes(String path, String revisionId)
             throws MicroKernelException {
 
         return getNodes(path, revisionId, 1, 0, -1, null);
     }
 
     @Override
-    public synchronized String getNodes(String path, String revisionId, int depth,
+    public String getNodes(String path, String revisionId, int depth,
             long offset, int count, String filter) throws MicroKernelException {
         
         Request request = null;
@@ -258,7 +255,7 @@ public class Client implements MicroKern
     }
 
     @Override
-    public synchronized String commit(String path, String jsonDiff, String revisionId,
+    public String commit(String path, String jsonDiff, String revisionId,
             String message) throws MicroKernelException {
         
         Request request = null;
@@ -278,7 +275,7 @@ public class Client implements MicroKern
     }
 
     @Override
-    public synchronized String branch(String trunkRevisionId)
+    public String branch(String trunkRevisionId)
             throws MicroKernelException {
 
         Request request = null;
@@ -295,7 +292,7 @@ public class Client implements MicroKern
     }
 
     @Override
-    public synchronized String merge(String branchRevisionId, String message)
+    public String merge(String branchRevisionId, String message)
             throws MicroKernelException {
 
         Request request = null;
@@ -313,7 +310,7 @@ public class Client implements MicroKern
     }
 
     @Override
-    public synchronized long getLength(String blobId) throws MicroKernelException {
+    public long getLength(String blobId) throws MicroKernelException {
         Request request = null;
 
         try {
@@ -328,7 +325,7 @@ public class Client implements MicroKern
     }
 
     @Override
-    public synchronized int read(String blobId, long pos, byte[] buff, int off, int length)
+    public int read(String blobId, long pos, byte[] buff, int off, int length)
             throws MicroKernelException {
 
         Request request = null;
@@ -347,7 +344,7 @@ public class Client implements MicroKern
     }
 
     @Override
-    public synchronized String write(InputStream in) throws MicroKernelException {
+    public String write(InputStream in) throws MicroKernelException {
         Request request = null;
 
         try {
@@ -385,23 +382,7 @@ public class Client implements MicroKern
      * @throws MicroKernelException if an exception occurs
      */
     private Request createRequest(String command) throws IOException, MicroKernelException {
-        if (disposed.get()) {
-            throw new IllegalStateException("This instance has already been disposed");
-        }
-        if (executor != null && !executor.isAlive()) {
-            IOUtils.closeQuietly(executor);
-            executor = null;
-        }
-        if (executor == null) {
-            executor = new HttpExecutor(createSocket());
-        }
-        return new Request(executor, command);
+        return new Request(socketFactory, addr, command);
     }
     
-    private Socket createSocket() throws IOException {
-        if (addr == null) {
-            return socketFactory.createSocket();
-        }
-        return socketFactory.createSocket(addr.getAddress(), addr.getPort());
-    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/HttpExecutor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/HttpExecutor.java?rev=1331832&r1=1331831&r2=1331832&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/HttpExecutor.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/HttpExecutor.java Sat Apr 28 21:09:03 2012
@@ -28,6 +28,7 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.URLEncoder;
 import java.security.SecureRandom;
@@ -35,6 +36,8 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Random;
 
+import javax.net.SocketFactory;
+
 /**
  * Executes commands as HTTP requests.
  * <p>
@@ -59,10 +62,18 @@ class HttpExecutor implements Closeable 
     /**
      * Create a new instance of this class.
      * 
-     * @param socket socket
+     * @param socketFactory socket factory
+     * @param socketAddress server address
+     * @throws IOException if the server could not be contacted
      */
-    public HttpExecutor(Socket socket) {
-        this.socket = socket;
+    public HttpExecutor(SocketFactory socketFactory, InetSocketAddress socketAddress)
+	    throws IOException {
+        if (socketAddress != null) {
+            socket = socketFactory.createSocket(
+                    socketAddress.getAddress(), socketAddress.getPort());
+        } else {
+            socket = socketFactory.createSocket();
+        }
     }
     
     /**

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/Request.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/Request.java?rev=1331832&r1=1331831&r2=1331832&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/Request.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/Request.java Sat Apr 28 21:09:03 2012
@@ -20,9 +20,11 @@ import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.InetSocketAddress;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.SocketFactory;
 
 import org.apache.jackrabbit.mk.util.IOUtils;
 
@@ -31,8 +33,10 @@ import org.apache.jackrabbit.mk.util.IOU
  * implementation.
  */
 class Request implements Closeable {
-    
-    private HttpExecutor executor;
+
+    private final SocketFactory socketFactory;
+
+    private final InetSocketAddress socketAddress;
     
     private final String command;
     
@@ -40,21 +44,21 @@ class Request implements Closeable {
     
     private InputStream in;
     
-    private InputStream resultIn;
-    
-    private final AtomicBoolean executed = new AtomicBoolean();
-    
     /**
      * Create a new instance of this class.
      * 
-     * @param executor executor
+     * @param socketFactory socket factory
+     * @param socketAddress server address
      * @param command command name
      */
-    public Request(HttpExecutor executor, String command) {
-        this.executor = executor;
+    public Request(
+            SocketFactory socketFactory, InetSocketAddress socketAddress,
+            String command) {
+        this.socketFactory = socketFactory;
+        this.socketAddress = socketAddress;
         this.command = command;
     }
-    
+
     /**
      * Add a string parameter.
      *
@@ -108,13 +112,22 @@ class Request implements Closeable {
      * 
      * @throws IOException if an I/O error occurs
      */
-    public void execute() throws IOException {
-        if (!executed.compareAndSet(false, true)) {
-            return;
+    private byte[] execute() throws IOException {
+        HttpExecutor executor = new HttpExecutor(socketFactory, socketAddress);
+        try {
+            InputStream stream = executor.execute(command, params, in);
+            try {
+                ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+                IOUtils.copy(stream, buffer);
+                return buffer.toByteArray();
+            } finally {
+                stream.close();
+            }
+        } finally {
+            executor.close();
         }
-        resultIn = executor.execute(command, params, in);
     }
-    
+
     /**
      * Return a string from the result stream. Automatically executes
      * the request first.
@@ -123,9 +136,7 @@ class Request implements Closeable {
      * @throws IOException if an I/O error occurs
      */
     public String getString() throws IOException {
-        execute();
-        
-        return new String(toByteArray(resultIn), "8859_1");
+        return new String(execute(), "8859_1");
     }
 
     /**
@@ -137,8 +148,6 @@ class Request implements Closeable {
      * @throws IOException if an I/O error occurs
      */
     public boolean getBoolean() throws IOException {
-        execute();
-        
         return Boolean.parseBoolean(getString());
     }
     
@@ -151,8 +160,6 @@ class Request implements Closeable {
      * @throws IOException if an I/O error occurs
      */
     public long getLong() throws IOException {
-        execute();
-        
         return Long.parseLong(getString());
     }
 
@@ -168,29 +175,23 @@ class Request implements Closeable {
      * @throws IOException if an I/O error occurs
      */
     public int read(byte[] b, int off, int len) throws IOException {
-        execute();
-        
-        int count = 0;
-        while (count < len) {
-            int n = resultIn.read(b, off + count, len - count);
-            if (n < 0) {
-                break;
-            }
-            count += n;
+        if (len == 0) {
+            return 0;
         }
-        return count == 0 && len != 0 ? -1 : count;
+
+        byte[] bytes = execute();
+        len = Math.min(bytes.length, len);
+        if (len == 0) {
+            return -1;
+        }
+
+        System.arraycopy(bytes, 0, b, off, len);
+        return len;
     }
-    
+
     @Override
     public void close() {
-        IOUtils.closeQuietly(resultIn);
-        
-        executor = null;
-    }
-    
-    private static byte[] toByteArray(InputStream in) throws IOException {
-        ByteArrayOutputStream out = new ByteArrayOutputStream(1024);
-        IOUtils.copy(in, out);
-        return out.toByteArray();
+        // do nothing
     }
+
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/server/MicroKernelServlet.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/server/MicroKernelServlet.java?rev=1331832&r1=1331831&r2=1331832&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/server/MicroKernelServlet.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/server/MicroKernelServlet.java Sat Apr 28 21:09:03 2012
@@ -126,7 +126,7 @@ class MicroKernelServlet {
 
             String headRevision = mk.getHeadRevision();
 
-            String oldHead = request.getParameter("old_revision_id", headRevision);
+            String oldHead = request.getParameter("revision_id", headRevision);
             long maxWaitMillis = request.getParameter("max_wait_millis", 0L);
 
             String currentHead;

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/server/Server.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/server/Server.java?rev=1331832&r1=1331831&r2=1331832&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/server/Server.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/server/Server.java Sat Apr 28 21:09:03 2012
@@ -143,7 +143,7 @@ public class Server {
     }
 
     private ExecutorService createExecutorService() {
-        return Executors.newFixedThreadPool(10);
+        return Executors.newCachedThreadPool();
     }
 
     /**