You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2012/04/28 23:09:03 UTC
svn commit: r1331832 - in
/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk:
client/Client.java client/HttpExecutor.java client/Request.java
server/MicroKernelServlet.java server/Server.java
Author: jukka
Date: Sat Apr 28 21:09:03 2012
New Revision: 1331832
URL: http://svn.apache.org/viewvc?rev=1331832&view=rev
Log:
OAK-78: waitForCommit() test failure for MK remoting
Allow concurrent use of Client and HttpExecutor.
Fix parameter name mismatch for remoting waitForCommit.
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/Client.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/HttpExecutor.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/Request.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/server/MicroKernelServlet.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/server/Server.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/Client.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/Client.java?rev=1331832&r1=1331831&r2=1331832&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/Client.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/Client.java Sat Apr 28 21:09:03 2012
@@ -96,15 +96,12 @@ public class Client implements MicroKern
//-------------------------------------------------- implements MicroKernel
@Override
- public synchronized void dispose() {
- if (!disposed.compareAndSet(false, true)) {
- return;
- }
- IOUtils.closeQuietly(executor);
+ public void dispose() {
+ // do nothing
}
@Override
- public synchronized String getHeadRevision() throws MicroKernelException {
+ public String getHeadRevision() throws MicroKernelException {
Request request = null;
try {
@@ -118,7 +115,7 @@ public class Client implements MicroKern
}
@Override
- public synchronized String getRevisionHistory(long since, int maxEntries)
+ public String getRevisionHistory(long since, int maxEntries)
throws MicroKernelException {
Request request = null;
@@ -136,7 +133,7 @@ public class Client implements MicroKern
}
@Override
- public synchronized String waitForCommit(String oldHeadRevisionId, long maxWaitMillis)
+ public String waitForCommit(String oldHeadRevisionId, long maxWaitMillis)
throws MicroKernelException, InterruptedException {
Request request = null;
@@ -154,7 +151,7 @@ public class Client implements MicroKern
}
@Override
- public synchronized String getJournal(String fromRevisionId, String toRevisionId, String filter)
+ public String getJournal(String fromRevisionId, String toRevisionId, String filter)
throws MicroKernelException {
Request request = null;
@@ -173,7 +170,7 @@ public class Client implements MicroKern
}
@Override
- public synchronized String diff(String fromRevisionId, String toRevisionId, String filter)
+ public String diff(String fromRevisionId, String toRevisionId, String filter)
throws MicroKernelException {
Request request = null;
@@ -191,7 +188,7 @@ public class Client implements MicroKern
}
@Override
- public synchronized boolean nodeExists(String path, String revisionId)
+ public boolean nodeExists(String path, String revisionId)
throws MicroKernelException {
Request request = null;
@@ -209,7 +206,7 @@ public class Client implements MicroKern
}
@Override
- public synchronized long getChildNodeCount(String path, String revisionId)
+ public long getChildNodeCount(String path, String revisionId)
throws MicroKernelException {
Request request = null;
@@ -227,14 +224,14 @@ public class Client implements MicroKern
}
@Override
- public synchronized String getNodes(String path, String revisionId)
+ public String getNodes(String path, String revisionId)
throws MicroKernelException {
return getNodes(path, revisionId, 1, 0, -1, null);
}
@Override
- public synchronized String getNodes(String path, String revisionId, int depth,
+ public String getNodes(String path, String revisionId, int depth,
long offset, int count, String filter) throws MicroKernelException {
Request request = null;
@@ -258,7 +255,7 @@ public class Client implements MicroKern
}
@Override
- public synchronized String commit(String path, String jsonDiff, String revisionId,
+ public String commit(String path, String jsonDiff, String revisionId,
String message) throws MicroKernelException {
Request request = null;
@@ -278,7 +275,7 @@ public class Client implements MicroKern
}
@Override
- public synchronized String branch(String trunkRevisionId)
+ public String branch(String trunkRevisionId)
throws MicroKernelException {
Request request = null;
@@ -295,7 +292,7 @@ public class Client implements MicroKern
}
@Override
- public synchronized String merge(String branchRevisionId, String message)
+ public String merge(String branchRevisionId, String message)
throws MicroKernelException {
Request request = null;
@@ -313,7 +310,7 @@ public class Client implements MicroKern
}
@Override
- public synchronized long getLength(String blobId) throws MicroKernelException {
+ public long getLength(String blobId) throws MicroKernelException {
Request request = null;
try {
@@ -328,7 +325,7 @@ public class Client implements MicroKern
}
@Override
- public synchronized int read(String blobId, long pos, byte[] buff, int off, int length)
+ public int read(String blobId, long pos, byte[] buff, int off, int length)
throws MicroKernelException {
Request request = null;
@@ -347,7 +344,7 @@ public class Client implements MicroKern
}
@Override
- public synchronized String write(InputStream in) throws MicroKernelException {
+ public String write(InputStream in) throws MicroKernelException {
Request request = null;
try {
@@ -385,23 +382,7 @@ public class Client implements MicroKern
* @throws MicroKernelException if an exception occurs
*/
private Request createRequest(String command) throws IOException, MicroKernelException {
- if (disposed.get()) {
- throw new IllegalStateException("This instance has already been disposed");
- }
- if (executor != null && !executor.isAlive()) {
- IOUtils.closeQuietly(executor);
- executor = null;
- }
- if (executor == null) {
- executor = new HttpExecutor(createSocket());
- }
- return new Request(executor, command);
+ return new Request(socketFactory, addr, command);
}
- private Socket createSocket() throws IOException {
- if (addr == null) {
- return socketFactory.createSocket();
- }
- return socketFactory.createSocket(addr.getAddress(), addr.getPort());
- }
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/HttpExecutor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/HttpExecutor.java?rev=1331832&r1=1331831&r2=1331832&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/HttpExecutor.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/HttpExecutor.java Sat Apr 28 21:09:03 2012
@@ -28,6 +28,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URLEncoder;
import java.security.SecureRandom;
@@ -35,6 +36,8 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Random;
+import javax.net.SocketFactory;
+
/**
* Executes commands as HTTP requests.
* <p>
@@ -59,10 +62,18 @@ class HttpExecutor implements Closeable
/**
* Create a new instance of this class.
*
- * @param socket socket
+ * @param socketFactory socket factory
+ * @param socketAddress server address
+ * @throws IOException if the server could not be contacted
*/
- public HttpExecutor(Socket socket) {
- this.socket = socket;
+ public HttpExecutor(SocketFactory socketFactory, InetSocketAddress socketAddress)
+ throws IOException {
+ if (socketAddress != null) {
+ socket = socketFactory.createSocket(
+ socketAddress.getAddress(), socketAddress.getPort());
+ } else {
+ socket = socketFactory.createSocket();
+ }
}
/**
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/Request.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/Request.java?rev=1331832&r1=1331831&r2=1331832&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/Request.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/client/Request.java Sat Apr 28 21:09:03 2012
@@ -20,9 +20,11 @@ import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
+import java.net.InetSocketAddress;
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.SocketFactory;
import org.apache.jackrabbit.mk.util.IOUtils;
@@ -31,8 +33,10 @@ import org.apache.jackrabbit.mk.util.IOU
* implementation.
*/
class Request implements Closeable {
-
- private HttpExecutor executor;
+
+ private final SocketFactory socketFactory;
+
+ private final InetSocketAddress socketAddress;
private final String command;
@@ -40,21 +44,21 @@ class Request implements Closeable {
private InputStream in;
- private InputStream resultIn;
-
- private final AtomicBoolean executed = new AtomicBoolean();
-
/**
* Create a new instance of this class.
*
- * @param executor executor
+ * @param socketFactory socket factory
+ * @param socketAddress server address
* @param command command name
*/
- public Request(HttpExecutor executor, String command) {
- this.executor = executor;
+ public Request(
+ SocketFactory socketFactory, InetSocketAddress socketAddress,
+ String command) {
+ this.socketFactory = socketFactory;
+ this.socketAddress = socketAddress;
this.command = command;
}
-
+
/**
* Add a string parameter.
*
@@ -108,13 +112,22 @@ class Request implements Closeable {
*
* @throws IOException if an I/O error occurs
*/
- public void execute() throws IOException {
- if (!executed.compareAndSet(false, true)) {
- return;
+ private byte[] execute() throws IOException {
+ HttpExecutor executor = new HttpExecutor(socketFactory, socketAddress);
+ try {
+ InputStream stream = executor.execute(command, params, in);
+ try {
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ IOUtils.copy(stream, buffer);
+ return buffer.toByteArray();
+ } finally {
+ stream.close();
+ }
+ } finally {
+ executor.close();
}
- resultIn = executor.execute(command, params, in);
}
-
+
/**
* Return a string from the result stream. Automatically executes
* the request first.
@@ -123,9 +136,7 @@ class Request implements Closeable {
* @throws IOException if an I/O error occurs
*/
public String getString() throws IOException {
- execute();
-
- return new String(toByteArray(resultIn), "8859_1");
+ return new String(execute(), "8859_1");
}
/**
@@ -137,8 +148,6 @@ class Request implements Closeable {
* @throws IOException if an I/O error occurs
*/
public boolean getBoolean() throws IOException {
- execute();
-
return Boolean.parseBoolean(getString());
}
@@ -151,8 +160,6 @@ class Request implements Closeable {
* @throws IOException if an I/O error occurs
*/
public long getLong() throws IOException {
- execute();
-
return Long.parseLong(getString());
}
@@ -168,29 +175,23 @@ class Request implements Closeable {
* @throws IOException if an I/O error occurs
*/
public int read(byte[] b, int off, int len) throws IOException {
- execute();
-
- int count = 0;
- while (count < len) {
- int n = resultIn.read(b, off + count, len - count);
- if (n < 0) {
- break;
- }
- count += n;
+ if (len == 0) {
+ return 0;
}
- return count == 0 && len != 0 ? -1 : count;
+
+ byte[] bytes = execute();
+ len = Math.min(bytes.length, len);
+ if (len == 0) {
+ return -1;
+ }
+
+ System.arraycopy(bytes, 0, b, off, len);
+ return len;
}
-
+
@Override
public void close() {
- IOUtils.closeQuietly(resultIn);
-
- executor = null;
- }
-
- private static byte[] toByteArray(InputStream in) throws IOException {
- ByteArrayOutputStream out = new ByteArrayOutputStream(1024);
- IOUtils.copy(in, out);
- return out.toByteArray();
+ // do nothing
}
+
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/server/MicroKernelServlet.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/server/MicroKernelServlet.java?rev=1331832&r1=1331831&r2=1331832&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/server/MicroKernelServlet.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/server/MicroKernelServlet.java Sat Apr 28 21:09:03 2012
@@ -126,7 +126,7 @@ class MicroKernelServlet {
String headRevision = mk.getHeadRevision();
- String oldHead = request.getParameter("old_revision_id", headRevision);
+ String oldHead = request.getParameter("revision_id", headRevision);
long maxWaitMillis = request.getParameter("max_wait_millis", 0L);
String currentHead;
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/server/Server.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/server/Server.java?rev=1331832&r1=1331831&r2=1331832&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/server/Server.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/mk/server/Server.java Sat Apr 28 21:09:03 2012
@@ -143,7 +143,7 @@ public class Server {
}
private ExecutorService createExecutorService() {
- return Executors.newFixedThreadPool(10);
+ return Executors.newCachedThreadPool();
}
/**