You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/06/18 16:15:23 UTC

svn commit: r1494141 - /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java

Author: jukka
Date: Tue Jun 18 14:15:23 2013
New Revision: 1494141

URL: http://svn.apache.org/r1494141
Log:
OAK-763: Asynchronous indexing

Prevent conflicting index updates by concurrently running async indexing tasks

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1494141&r1=1494140&r2=1494141&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Tue Jun 18 14:15:23 2013
@@ -28,8 +28,8 @@ import javax.annotation.Nonnull;
 
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.spi.commit.CommitHook;
 import org.apache.jackrabbit.oak.spi.commit.EditorDiff;
-import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
@@ -37,6 +37,8 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Objects;
+
 public class AsyncIndexUpdate implements Runnable {
 
     private static final Logger log = LoggerFactory
@@ -50,6 +52,9 @@ public class AsyncIndexUpdate implements
 
     private static final long DEFAULT_LIFETIME = TimeUnit.HOURS.toMillis(1);
 
+    private static final CommitFailedException CONCURRENT_UPDATE =
+            new CommitFailedException("Async", 1, "Concurrent update detected");
+
     private final String name;
 
     private final NodeStore store;
@@ -78,9 +83,9 @@ public class AsyncIndexUpdate implements
         NodeBuilder async = builder.child(ASYNC);
 
         NodeState before = null;
-        PropertyState property = async.getProperty(name);
-        if (property != null && property.getType() == STRING) {
-            before = store.retrieve(property.getValue(STRING));
+        final PropertyState state = async.getProperty(name);
+        if (state != null && state.getType() == STRING) {
+            before = store.retrieve(state.getValue(STRING));
         }
         if (before == null) {
             before = MISSING_NODE;
@@ -96,9 +101,25 @@ public class AsyncIndexUpdate implements
                 try {
                     async.setProperty(name, checkpoint);
                     branch.setRoot(builder.getNodeState());
-                    branch.merge(EmptyHook.INSTANCE);
+                    branch.merge(new CommitHook() {
+                        @Override @Nonnull
+                        public NodeState processCommit(
+                                NodeState before, NodeState after)
+                                throws CommitFailedException {
+                            // check for concurrent updates by this async task
+                            PropertyState stateAfterRebase =
+                                    before.getChildNode(ASYNC).getProperty(name);
+                            if (Objects.equal(state, stateAfterRebase)) {
+                                return after;
+                            } else {
+                                throw CONCURRENT_UPDATE;
+                            }
+                        }
+                    });
                 } catch (CommitFailedException e) {
-                    exception = e;
+                    if (e != CONCURRENT_UPDATE) {
+                        exception = e;
+                    }
                 }
             }