You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by da...@apache.org on 2016/01/18 13:16:29 UTC

svn commit: r1725250 - in /jackrabbit/oak/trunk: oak-core/src/main/java/org/apache/jackrabbit/oak/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/atomic/ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/atomic/ oak-jcr/src/main/java/o...

Author: davide
Date: Mon Jan 18 12:16:29 2016
New Revision: 1725250

URL: http://svn.apache.org/viewvc?rev=1725250&view=rev
Log:
OAK-2472 - Add support for atomic counters on cluster solutions

Added:
    jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterClusterIT.java
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/package-info.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditor.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorProvider.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/atomic/package-info.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorTest.java
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/package-info.java
    jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterIT.java
    jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterTest.java
    jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java?rev=1725250&r1=1725249&r2=1725250&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java Mon Jan 18 12:16:29 2016
@@ -50,6 +50,7 @@ import javax.management.StandardMBean;
 import javax.security.auth.login.LoginException;
 
 import com.google.common.base.Function;
+import com.google.common.base.Supplier;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closer;
@@ -64,6 +65,7 @@ import org.apache.jackrabbit.oak.api.jmx
 import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
 import org.apache.jackrabbit.oak.core.ContentRepositoryImpl;
 import org.apache.jackrabbit.oak.management.RepositoryManager;
+import org.apache.jackrabbit.oak.plugins.atomic.AtomicCounterEditorProvider;
 import org.apache.jackrabbit.oak.plugins.commit.ConflictHook;
 import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate;
 import org.apache.jackrabbit.oak.plugins.index.CompositeIndexEditorProvider;
@@ -549,6 +551,34 @@ public class Oak {
         return withAsyncIndexing("async", 5);
     }
 
+    public Oak withAtomicCounter() {
+        return with(new AtomicCounterEditorProvider(
+            new Supplier<Clusterable>() {
+                @Override
+                public Clusterable get() {
+                    return clusterable;
+                }
+            },
+            new Supplier<ScheduledExecutorService>() {
+                @Override
+                public ScheduledExecutorService get() {
+                    return scheduledExecutor;
+                }
+            }, 
+            new Supplier<NodeStore>() {
+                @Override
+                public NodeStore get() {
+                    return store;
+                }
+            },
+            new Supplier<Whiteboard>() {
+                @Override
+                public Whiteboard get() {
+                    return whiteboard;
+                }
+            }));
+    }
+    
     /**
      * <p>
      * Enable the asynchronous (background) indexing behavior for the provided

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/package-info.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/package-info.java?rev=1725250&r1=1725249&r2=1725250&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/package-info.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/package-info.java Mon Jan 18 12:16:29 2016
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-@Version("1.2.0")
+@Version("1.3.0")
 @Export(optional = "provide:=true")
 package org.apache.jackrabbit.oak;
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditor.java?rev=1725250&r1=1725249&r2=1725250&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditor.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditor.java Mon Jan 18 12:16:29 2016
@@ -23,18 +23,31 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.plugins.nodetype.NodeTypeConstants.MIX_ATOMIC_COUNTER;
 
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.memory.LongPropertyState;
+import org.apache.jackrabbit.oak.spi.commit.CommitHook;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.DefaultEditor;
 import org.apache.jackrabbit.oak.spi.commit.Editor;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
+import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.Iterators;
 
 /**
@@ -47,13 +60,13 @@ import com.google.common.collect.Iterato
  * <p>
  * Whenever you add a {@link NodeTypeConstants#MIX_ATOMIC_COUNTER} mixin to a node it will turn it
  * into an atomic counter. Then in order to increment or decrement the {@code oak:counter} property
- * you'll need to set the {@code oak:increment} one ({@link #PROP_INCREMENT). Please note that the
+ * you'll need to set the {@code oak:increment} one ({@link #PROP_INCREMENT}). Please note that the
  * <strong>{@code oak:incremement} will never be saved</strong>, only the {@code oak:counter} will
  * be amended accordingly.
  * </p>
  * 
  * <p>
- *  So in order to deal with the counter from a JCR point of view you'll do something as follows 
+ * So in order to deal with the counter from a JCR point of view you'll do something as follows
  * </p>
  * 
  * <pre>
@@ -83,6 +96,32 @@ import com.google.common.collect.Iterato
  *  
  *  session.logout();
  * </pre>
+ * 
+ * <h3>Internal behavioural details</h3>
+ * 
+ * <p>
+ * The related jira ticket is <a href="https://issues.apache.org/jira/browse/OAK-2472">OAK-2472</a>.
+ * In a nutshell when you save an {@code oak:increment} behind the scene it takes its value and
+ * increment an internal counter. There will be an individual counter for each cluster node.
+ * </p>
+ * 
+ * <p>
+ * Then it will consolidate all the internal counters into a single one: {@code oak:counter}. The
+ * consolidation process can happen either synchronously or asynchronously. Refer to
+ * {@link #AtomicCounterEditor(NodeBuilder, String, ScheduledExecutorService, NodeStore, Whiteboard)}
+ * for details on when it consolidate one way or the other.
+ * </p>
+ * 
+ * <p>
+ * <strong>synchronous</strong>. It means the consolidation, sum of all the internal counters, will
+ * happen in the same thread. During the lifecycle of the same commit.
+ * </p>
+ * 
+ * <p>
+ * <strong>asynchronous</strong>. It means the internal counters will be set during the same commit;
+ * but it will eventually schedule a separate thread in which will retry some times to consolidate
+ * them.
+ * </p>
  */
 public class AtomicCounterEditor extends DefaultEditor {
     /**
@@ -100,22 +139,78 @@ public class AtomicCounterEditor extends
      */
     public static final String PREFIX_PROP_COUNTER = ":oak-counter-";
     
+    /**
+     * prefix used internally for tracking the cluster node related revision numbers
+     */
+    public static final String PREFIX_PROP_REVISION = ":rev-";
+    
     private static final Logger LOG = LoggerFactory.getLogger(AtomicCounterEditor.class);
     private final NodeBuilder builder;
     private final String path;
-
+    private final String instanceId;
+    private final ScheduledExecutorService executor;
+    private final NodeStore store;
+    private final Whiteboard board;
+    
+    /**
+     * the current counter property name
+     */
+    private final String counterName;
+    
+    /**
+     * the current revision property name
+     */
+    private final String revisionName;
+    
     /**
      * instruct whether to update the node on leave.
      */
     private boolean update;
     
-    public AtomicCounterEditor(@Nonnull final NodeBuilder builder) {
-        this("", checkNotNull(builder));
+    /**
+     * <p>
+     * Create an instance of the editor for atomic increments. It can works synchronously as well as
+     * asynchronously. See class javadoc for details around it.
+     * </p>
+     * <p>
+     * If {@code instanceId} OR {@code executor} OR {@code store} OR {@code board} are null, the
+     * editor will switch to synchronous behaviour for consolidation. If no {@link CommitHook} will
+     * be found in the whiteboard, a {@link EmptyHook} will be provided to the {@link NodeStore} for
+     * merging.
+     * </p>
+     * 
+     * @param builder the build on which to work. Cannot be null.
+     * @param instanceId the current Oak instance Id. If null editor will be synchronous.
+     * @param executor the current Oak executor service. If null editor will be synchronous.
+     * @param store the current Oak node store. If null the editor will be synchronous.
+     * @param board the current Oak {@link Whiteboard}.
+     */
+    public AtomicCounterEditor(@Nonnull final NodeBuilder builder, 
+                               @Nullable String instanceId,
+                               @Nullable ScheduledExecutorService executor,
+                               @Nullable NodeStore store,
+                               @Nullable Whiteboard board) {
+        this("", checkNotNull(builder), instanceId, executor, store, board);
     }
 
-    private AtomicCounterEditor(final String path, final NodeBuilder builder) {
+    private AtomicCounterEditor(final String path, 
+                                final NodeBuilder builder, 
+                                @Nullable String instanceId, 
+                                @Nullable ScheduledExecutorService executor,
+                                @Nullable NodeStore store,
+                                @Nullable Whiteboard board) {
         this.builder = checkNotNull(builder);
         this.path = path;
+        this.instanceId = Strings.isNullOrEmpty(instanceId) ? null : instanceId;
+        this.executor = executor;
+        this.store = store;
+        this.board = board;
+        
+        counterName = instanceId == null ? PREFIX_PROP_COUNTER : 
+            PREFIX_PROP_COUNTER + instanceId;
+        revisionName = instanceId == null ? PREFIX_PROP_REVISION :
+            PREFIX_PROP_REVISION + instanceId;
+
     }
 
     private static boolean shallWeProcessProperty(final PropertyState property,
@@ -152,13 +247,10 @@ public class AtomicCounterEditor extends
      * @param builder the builder to work on. Cannot be null.
      */
     public static void consolidateCount(@Nonnull final NodeBuilder builder) {
-        long count = builder.hasProperty(PROP_COUNTER)
-                        ? builder.getProperty(PROP_COUNTER).getValue(LONG)
-                        : 0;
+        long count = 0;
         for (PropertyState p : builder.getProperties()) {
             if (p.getName().startsWith(PREFIX_PROP_COUNTER)) {
                 count += p.getValue(LONG);
-                builder.removeProperty(p.getName());
             }
         }
 
@@ -167,7 +259,25 @@ public class AtomicCounterEditor extends
 
     private void setUniqueCounter(final long value) {
         update = true;
-        builder.setProperty(PREFIX_PROP_COUNTER + UUID.randomUUID(), value, LONG);
+        
+        PropertyState counter = builder.getProperty(counterName);
+        PropertyState revision = builder.getProperty(revisionName);
+        
+        long currentValue = 0;
+        if (counter != null) {
+            currentValue = counter.getValue(LONG).longValue();
+        }
+        
+        long currentRevision = 0;
+        if (revision != null) {
+            currentRevision = revision.getValue(LONG).longValue();
+        }
+        
+        currentValue += value;
+        currentRevision += 1;
+
+        builder.setProperty(counterName, currentValue, LONG);
+        builder.setProperty(revisionName, currentRevision, LONG);
     }
     
     @Override
@@ -180,21 +290,241 @@ public class AtomicCounterEditor extends
 
     @Override
     public Editor childNodeAdded(final String name, final NodeState after) throws CommitFailedException {
-        return new AtomicCounterEditor(path + '/' + name, builder.getChildNode(name));
+        return new AtomicCounterEditor(path + '/' + name, builder.getChildNode(name), instanceId,
+            executor, store, board);
     }
 
     @Override
     public Editor childNodeChanged(final String name, 
                                    final NodeState before, 
                                    final NodeState after) throws CommitFailedException {
-        return new AtomicCounterEditor(path + '/' + name, builder.getChildNode(name));
+        return new AtomicCounterEditor(path + '/' + name, builder.getChildNode(name), instanceId,
+            executor, store, board);
     }
 
     @Override
     public void leave(final NodeState before, final NodeState after) throws CommitFailedException {
         if (update) {
-            // TODO here is where the Async check could be done
-            consolidateCount(builder);
+            if (instanceId == null || store == null || executor == null || board == null) {
+                LOG.trace(
+                    "Executing synchronously. instanceId: {}, store: {}, executor: {}, board: {}",
+                    new Object[] { instanceId, store, executor, board });
+                consolidateCount(builder);
+            } else {                
+                CommitHook hook = WhiteboardUtils.getService(board, CommitHook.class);
+                if (hook == null) {
+                    LOG.trace("CommitHook not registered with Whiteboard. Falling back to sync.");
+                    consolidateCount(builder);
+                } else {
+                    long delay = 0;
+                    ConsolidatorTask t = new ConsolidatorTask(
+                        path, 
+                        builder.getProperty(revisionName), 
+                        store, 
+                        executor, 
+                        delay, 
+                        hook);
+                    LOG.debug("[{}] Scheduling process by {}secs", t.getName(), delay); 
+                    executor.schedule(t, delay, TimeUnit.SECONDS);                    
+                }
+            }
+        }
+    }
+    
+    public static class ConsolidatorTask implements Callable<Void> {
+        public static final long MAX_TIMEOUT = 32;
+        private final String name;
+        private final String p;
+        private final PropertyState rev;
+        private final NodeStore s;
+        private final ScheduledExecutorService exec;
+        private final long delay;
+        private final long start;
+        private final CommitHook hook;
+        
+        public ConsolidatorTask(@Nonnull String path, 
+                                @Nullable PropertyState revision, 
+                                @Nonnull NodeStore store,
+                                @Nonnull ScheduledExecutorService exec,
+                                long delay,
+                                @Nonnull CommitHook hook) {
+            this.start = System.currentTimeMillis();
+            p = checkNotNull(path);
+            rev = revision;
+            s = checkNotNull(store);
+            this.exec = checkNotNull(exec);
+            this.delay = delay;
+            this.hook = checkNotNull(hook);
+            this.name = UUID.randomUUID().toString();
+        }
+
+        private ConsolidatorTask(@Nonnull ConsolidatorTask task, long delay) {
+            checkNotNull(task);
+            this.p = task.p;
+            this.rev = task.rev;
+            this.s = task.s;
+            this.exec = task.exec;
+            this.delay = delay;
+            this.hook = task.hook;
+            this.name = task.name;
+            this.start = task.start;
+        }
+        
+        @Override
+        public Void call() throws Exception {            
+            try {
+                LOG.debug("[{}] Async consolidation running: path: {}, revision: {}", name, p, rev);
+                NodeBuilder root = s.getRoot().builder();
+                NodeBuilder b = builderFromPath(root, p);
+                
+                dumpNode(b, p);
+                
+                if (!b.exists()) {
+                    LOG.debug("[{}] Builder for '{}' from NodeStore not available. Rescheduling.",
+                        name, p);
+                    reschedule();
+                    return null;
+                }
+                
+                if (!checkRevision(b, rev)) {
+                    LOG.debug("[{}] Missing or not yet a valid revision for '{}'. Rescheduling.",
+                        name, p);
+                    reschedule();
+                    return null;
+                }
+
+                if (isConsolidate(b)) {
+                    LOG.trace("[{}] consolidating.", name);
+                    consolidateCount(b);
+                    s.merge(root, hook, CommitInfo.EMPTY);                    
+                } else {
+                    LOG.debug("[{}] Someone else consolidated. Skipping any operation.", name);
+                }
+            } catch (Exception e) {
+                LOG.debug("[{}] caught Exception. Rescheduling. {}", name, e.getMessage());
+                if (LOG.isTraceEnabled()) {
+                    // duplicating message in logs; but avoiding unnecessary stacktrace generation
+                    LOG.trace("[{}] caught Exception. Rescheduling.", name, e);
+                }
+                reschedule();
+                return null;
+            }
+            
+            LOG.debug("[{}] Consolidation for '{}', '{}' completed in {}ms", name, p, rev,
+                System.currentTimeMillis() - start);
+            return null;
+        }
+        
+        private void dumpNode(@Nonnull NodeBuilder b, String path) {
+            if (LOG.isTraceEnabled()) {
+                checkNotNull(b);
+                StringBuilder s = new StringBuilder();
+                for (PropertyState p : b.getProperties()) {
+                    s.append(p).append("\n");
+                }
+                LOG.trace("[{}] Node status for {}:\n{}", this.name, path, s);
+            }
+        }
+        
+        private void reschedule() {
+            long d = nextDelay(delay);
+            if (isTimedOut(d)) {
+                LOG.warn("[{}] The consolidator task for '{}' time out. Cancelling the retry.",
+                    name, p);
+                return;
+            }
+            
+            ConsolidatorTask task = new ConsolidatorTask(this, d);
+            LOG.debug("[{}] Rescheduling '{}' by {}sec", task.getName(), p, d);
+            exec.schedule(task, d, TimeUnit.SECONDS);
+        }
+        
+        public static long nextDelay(long currentDelay) {
+            if (currentDelay < 0) {
+                return 0;
+            }
+            if (currentDelay == 0) {
+                return 1;
+            }
+            if (currentDelay >= MAX_TIMEOUT) {
+                return Long.MAX_VALUE;
+            }
+            return currentDelay * 2;
+        }
+        
+        public static boolean isTimedOut(long delay) {
+            if (delay > MAX_TIMEOUT) {
+                return true;
+            }
+            return false;
+        }
+        
+        public String getName() {
+            return name;
+        }
+    }
+    
+    /**
+     * checks that the revision provided in the PropertyState is less or equal than the one within
+     * the builder.
+     * 
+     * if {@code revision} is null it will always be {@code true}.
+     * 
+     * If {@code builder} does not contain the property it will always return false.
+     * 
+     * @param builder
+     * @param revision
+     * @return
+     */
+    static boolean checkRevision(@Nonnull NodeBuilder builder, @Nullable PropertyState revision) {
+        if (revision == null) {
+            return true;
+        }
+        String pName = revision.getName();
+        PropertyState builderRev = builder.getProperty(pName);
+        if (builderRev == null) {
+            return false;
+        }
+        
+        long brValue = builderRev.getValue(Type.LONG).longValue();
+        long rValue = revision.getValue(Type.LONG).longValue();
+        
+        if (brValue >= rValue) {
+            return true;
+        }
+        return false;
+    }
+    
+    private static NodeBuilder builderFromPath(@Nonnull NodeBuilder ancestor, @Nonnull String path) {
+        NodeBuilder b = checkNotNull(ancestor);
+        for (String name : PathUtils.elements(checkNotNull(path))) {
+            b = b.getChildNode(name);
+        }
+        return b;
+    }
+    
+    /**
+     * check whether the provided builder has to be consolidated or not. A node has to be
+     * consolidate if the sum of all the hidden counter does not match the exposed one. It could
+     * happen that some other nodes previously saw our change and already consolidated it.
+     * 
+     * @param b the builde to check. Canno be null.
+     * @return true if the sum of the hidden counters does not match the exposed one.
+     */
+    static boolean isConsolidate(@Nonnull NodeBuilder b) {
+        checkNotNull(b);
+        PropertyState counter = b.getProperty(PROP_COUNTER);
+        if (counter == null) {
+            counter = LongPropertyState.createLongProperty(PROP_COUNTER, 0);
+        }
+        
+        long hiddensum = 0;
+        for (PropertyState p : b.getProperties()) {
+            if (p.getName().startsWith(PREFIX_PROP_COUNTER)) {
+                hiddensum += p.getValue(LONG).longValue();
+            }
         }
+        
+        return counter.getValue(LONG).longValue() != hiddensum;
     }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorProvider.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorProvider.java?rev=1725250&r1=1725249&r2=1725250&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorProvider.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorProvider.java Mon Jan 18 12:16:29 2016
@@ -16,28 +16,213 @@
  */
 package org.apache.jackrabbit.oak.plugins.atomic;
 
+import static org.apache.felix.scr.annotations.ReferenceCardinality.OPTIONAL_UNARY;
+import static org.apache.felix.scr.annotations.ReferencePolicy.DYNAMIC;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+
+import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
 import org.apache.felix.scr.annotations.Service;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.Editor;
 import org.apache.jackrabbit.oak.spi.commit.EditorProvider;
+import org.apache.jackrabbit.oak.spi.state.Clusterable;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
+import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
- * Provide an instance of {@link AtomicCounterEditor}
+ * Provide an instance of {@link AtomicCounterEditor}. See {@link AtomicCounterEditor} for
+ * behavioural details.
  */
 @Component
 @Property(name = "type", value = "atomicCounter", propertyPrivate = true)
 @Service(EditorProvider.class)
 public class AtomicCounterEditorProvider implements EditorProvider {
+    private static final Logger LOG = LoggerFactory.getLogger(AtomicCounterEditorProvider.class);
+
+    @Reference(policy = ReferencePolicy.DYNAMIC, cardinality = ReferenceCardinality.OPTIONAL_UNARY, referenceInterface = Clusterable.class)
+    private AtomicReference<Clusterable> cluster = new AtomicReference<Clusterable>();
+
+    @Reference(policy = DYNAMIC, cardinality = OPTIONAL_UNARY, referenceInterface = NodeStore.class)
+    private volatile AtomicReference<NodeStore> store = new AtomicReference<NodeStore>();    
+
+    private volatile AtomicReference<ScheduledExecutorService> scheduler = new AtomicReference<ScheduledExecutorService>();
+    private volatile AtomicReference<Whiteboard> whiteboard = new AtomicReference<Whiteboard>();
+    
+    private final Supplier<Clusterable> clusterSupplier;
+    private final Supplier<ScheduledExecutorService> schedulerSupplier;
+    private final Supplier<NodeStore> storeSupplier;
+    private final Supplier<Whiteboard> wbSupplier;
+    
+    /**
+     * OSGi oriented constructor where all the required dependencies will be taken care of.
+     */
+    public AtomicCounterEditorProvider() {
+        clusterSupplier = new Supplier<Clusterable>() {
+            @Override
+            public Clusterable get() {
+                return cluster.get();
+            }
+        };
+        schedulerSupplier = new Supplier<ScheduledExecutorService>() {
+            @Override
+            public ScheduledExecutorService get() {
+                return scheduler.get();
+            }
+        };
+        storeSupplier = new Supplier<NodeStore>() {
+            @Override
+            public NodeStore get() {
+                return store.get();
+            }
+        };
+        wbSupplier = new Supplier<Whiteboard>() {
+            @Override
+            public Whiteboard get() {
+                return whiteboard.get();
+            }
+        };
+    }
+
+    /**
+     * <p>
+     * Plain Java oriented constructor. Refer to
+     * {@link AtomicCounterEditor#AtomicCounterEditor(NodeBuilder, String, ScheduledExecutorService, NodeStore)}
+     * for constructions details of the actual editor.
+     * </p>
+     * 
+     * <p>
+     * Based on the use case this may need an already set of the constructor parameters during the
+     * repository construction. Please ensure they're registered before this provider is registered.
+     * </p>
+     * 
+     * @param clusterInfo cluster node information
+     * @param executor the executor for running asynchronously.
+     * @param store reference to the NodeStore.
+     * @param whiteboard the underlying board for picking up the registered {@link CommitHook}
+     */
+    public AtomicCounterEditorProvider(@Nullable Supplier<Clusterable> clusterInfo, 
+                                       @Nullable Supplier<ScheduledExecutorService> executor,
+                                       @Nullable Supplier<NodeStore> store,
+                                       @Nullable Supplier<Whiteboard> whiteboard) {
+        this.clusterSupplier = clusterInfo;
+        this.schedulerSupplier = executor;
+        this.storeSupplier = store;
+        this.wbSupplier = whiteboard;
+    }
+    
+    /**
+     * convenience method wrapping logic around {@link AtomicReference}
+     * 
+     * @return
+     */
+    private String getInstanceId() {
+        Clusterable c = clusterSupplier.get();
+        if (c == null) {
+            return null;
+        } else {
+            return c.getInstanceId();
+        }
+    }
+    
+    /**
+     * convenience method wrapping logic around {@link AtomicReference}
+     * 
+     * @return
+     */
+    private ScheduledExecutorService getScheduler() {
+        return schedulerSupplier.get();
+    }
     
+    /**
+     * convenience method wrapping logic around {@link AtomicReference}
+     * 
+     * @return
+     */
+    private NodeStore getStore() {
+        return storeSupplier.get();
+    }
+    
+    /**
+     * Convenience method wrapping logic around {@link AtomicReference}
+     * 
+     * @return
+     */
+    private Whiteboard getBoard() {
+        return wbSupplier.get();
+    }
+    
+    @Activate
+    public void activate(BundleContext context) {
+        whiteboard.set(new OsgiWhiteboard(context));
+        ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat("atomic-counter-%d").build();
+        scheduler.set(Executors.newScheduledThreadPool(10, tf));
+    }
+    
+    @Deactivate
+    public void deactivate() {
+        ScheduledExecutorService ses = getScheduler();
+        if (ses == null) {
+            LOG.debug("No ScheduledExecutorService found");
+        } else {
+            LOG.debug("Shutting down ScheduledExecutorService");
+            try {
+                ses.shutdown();
+                ses.awaitTermination(5, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                LOG.error("InterruptedException white shutting down ScheduledExecutorService", e);
+            } finally {
+                if (!ses.isTerminated()) {
+                    LOG.debug("ScheduledExecutorService not yet shutdown. Cancelling tasks and forcing quit.");
+                }
+                ses.shutdownNow();
+            }
+        }
+    }
+
+    protected void bindCluster(Clusterable store) {
+        this.cluster.set(store);
+    }
+
+    protected void unbindCluster(Clusterable store) {
+        this.cluster.compareAndSet(store, null);
+    }
+
+    protected void bindStore(NodeStore store) {
+        this.store.set(store);
+    }
+    
+    protected void unbindStore(NodeStore store) {
+        this.store.compareAndSet(store, null);
+    }
+
     @Override
     public Editor getRootEditor(final NodeState before, final NodeState after,
                                 final NodeBuilder builder, final CommitInfo info)
-                                    throws CommitFailedException {        
-        return new AtomicCounterEditor(builder);
+                                    throws CommitFailedException {
+        return new AtomicCounterEditor(builder, getInstanceId(), getScheduler(), getStore(),
+            getBoard());
     }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/atomic/package-info.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/atomic/package-info.java?rev=1725250&r1=1725249&r2=1725250&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/atomic/package-info.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/atomic/package-info.java Mon Jan 18 12:16:29 2016
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-@Version("1.0.1")
+@Version("2.0.0")
 @Export(optional = "provide:=true")
 package org.apache.jackrabbit.oak.plugins.atomic;
 

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorTest.java?rev=1725250&r1=1725249&r2=1725250&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorTest.java Mon Jan 18 12:16:29 2016
@@ -22,84 +22,146 @@ import static org.apache.jackrabbit.JcrC
 import static org.apache.jackrabbit.oak.api.Type.LONG;
 import static org.apache.jackrabbit.oak.api.Type.NAMES;
 import static org.apache.jackrabbit.oak.plugins.atomic.AtomicCounterEditor.PREFIX_PROP_COUNTER;
+import static org.apache.jackrabbit.oak.plugins.atomic.AtomicCounterEditor.PREFIX_PROP_REVISION;
 import static org.apache.jackrabbit.oak.plugins.atomic.AtomicCounterEditor.PROP_COUNTER;
 import static org.apache.jackrabbit.oak.plugins.atomic.AtomicCounterEditor.PROP_INCREMENT;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 import static org.apache.jackrabbit.oak.plugins.nodetype.NodeTypeConstants.MIX_ATOMIC_COUNTER;
+import static org.apache.jackrabbit.oak.spi.commit.CommitInfo.EMPTY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
-import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.fixture.NodeStoreFixture;
+import org.apache.jackrabbit.oak.plugins.memory.LongPropertyState;
 import org.apache.jackrabbit.oak.plugins.memory.PropertyStates;
+import org.apache.jackrabbit.oak.spi.commit.CommitHook;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.Editor;
+import org.apache.jackrabbit.oak.spi.commit.EditorHook;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.Clusterable;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import org.junit.Ignore;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.whiteboard.DefaultWhiteboard;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.services.sqs.model.UnsupportedOperationException;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 
 public class AtomicCounterEditorTest {
-    @Test
-    @Ignore // FIXME fix test expectations
-    public void childNodeAdded() throws CommitFailedException {
-        NodeBuilder builder = EMPTY_NODE.builder();
-        
-        Editor editor = new AtomicCounterEditor(EMPTY_NODE.builder());
-        
-        assertNull("without the mixin we should not process",
-            editor.childNodeAdded("foo", builder.getNodeState()));
-        
-        builder = EMPTY_NODE.builder();
-        builder = setMixin(builder);
-        assertTrue("with the mixin set we should get a proper Editor",
-            editor.childNodeAdded("foo", builder.getNodeState()) instanceof AtomicCounterEditor);
+    /**
+     * convenience class to ease construction during tests
+     */
+    private static class TestableACEProvider extends AtomicCounterEditorProvider {
+        public TestableACEProvider(final Clusterable c, final ScheduledExecutorService e,
+                                   final NodeStore s, final Whiteboard b) {
+            super(new Supplier<Clusterable>() {
+                @Override
+                public Clusterable get() {
+                    return c;
+                };
+            }, new Supplier<ScheduledExecutorService>() {
+                @Override
+                public ScheduledExecutorService get() {
+                    return e;
+                };
+            }, new Supplier<NodeStore>() {
+                @Override
+                public NodeStore get() {
+                    return s;
+                }
+            }, new Supplier<Whiteboard>() {
+                @Override
+                public Whiteboard get() {
+                    return b;
+                };
+            });
+        }
     }
+    private static final Clusterable CLUSTER_1 = new Clusterable() {
+        @Override
+        public String getInstanceId() {
+            return "1";
+        }
+    };
+    private static final Clusterable CLUSTER_2 = new Clusterable() {
+        @Override
+        public String getInstanceId() {
+            return "2";
+        }
+    };
+    private static final EditorHook HOOK_NO_CLUSTER = new EditorHook(
+        new TestableACEProvider(null, null, null, null));
+    private static final EditorHook HOOK_1_SYNC = new EditorHook(
+        new TestableACEProvider(CLUSTER_1, null, null, null));
+    private static final EditorHook HOOK_2_SYNC = new EditorHook(
+        new TestableACEProvider(CLUSTER_2, null, null, null));
+
+    private static final PropertyState INCREMENT_BY_1 = PropertyStates.createProperty(
+        PROP_INCREMENT, 1L);
+    private static final PropertyState INCREMENT_BY_2 = PropertyStates.createProperty(
+        PROP_INCREMENT, 2L);
     
     @Test
     public void increment() throws CommitFailedException {
         NodeBuilder builder;
         Editor editor;
-        PropertyState property;
         
         builder = EMPTY_NODE.builder();
-        editor = new AtomicCounterEditor(builder);
-        property = PropertyStates.createProperty(PROP_INCREMENT, 1L, Type.LONG);
-        editor.propertyAdded(property);
+        editor = new AtomicCounterEditor(builder, null, null, null, null);
+        editor.propertyAdded(INCREMENT_BY_1);
         assertNoCounters(builder.getProperties());
         
         builder = EMPTY_NODE.builder();
         builder = setMixin(builder);
-        editor = new AtomicCounterEditor(builder);
-        property = PropertyStates.createProperty(PROP_INCREMENT, 1L, Type.LONG);
-        editor.propertyAdded(property);
+        editor = new AtomicCounterEditor(builder, null, null, null, null);
+        editor.propertyAdded(INCREMENT_BY_1);
         assertNull("the oak:increment should never be set", builder.getProperty(PROP_INCREMENT));
-        assertTotalCounters(builder.getProperties(), 1);
+        assertTotalCountersValue(builder.getProperties(), 1);
     }
     
     @Test
     public void consolidate() throws CommitFailedException {
         NodeBuilder builder;
         Editor editor;
-        PropertyState property;
         
         builder = EMPTY_NODE.builder();
         builder = setMixin(builder);
-        editor = new AtomicCounterEditor(builder);
-        property = PropertyStates.createProperty(PROP_INCREMENT, 1L, Type.LONG);
+        editor = new AtomicCounterEditor(builder, null, null, null, null);
         
-        editor.propertyAdded(property);
-        assertTotalCounters(builder.getProperties(), 1);
-        editor.propertyAdded(property);
-        assertTotalCounters(builder.getProperties(), 2);
+        editor.propertyAdded(INCREMENT_BY_1);
+        assertTotalCountersValue(builder.getProperties(), 1);
+        editor.propertyAdded(INCREMENT_BY_1);
+        assertTotalCountersValue(builder.getProperties(), 2);
         AtomicCounterEditor.consolidateCount(builder);
-        assertNotNull(builder.getProperty(PROP_COUNTER));
-        assertEquals(2, builder.getProperty(PROP_COUNTER).getValue(LONG).longValue());
-        assertNoCounters(builder.getProperties());
+        assertCounterNodeState(builder, ImmutableSet.of(PREFIX_PROP_COUNTER, PREFIX_PROP_REVISION), 2);
     }
 
     /**
@@ -122,7 +184,7 @@ public class AtomicCounterEditorTest {
      * 
      * @param properties
      */
-    private static void assertTotalCounters(@Nonnull final Iterable<? extends PropertyState> properties,
+    private static void assertTotalCountersValue(@Nonnull final Iterable<? extends PropertyState> properties,
                                             int expected) {
         int total = 0;
         for (PropertyState p : checkNotNull(properties)) {
@@ -137,4 +199,446 @@ public class AtomicCounterEditorTest {
     private static NodeBuilder setMixin(@Nonnull final NodeBuilder builder) {
         return checkNotNull(builder).setProperty(JCR_MIXINTYPES, of(MIX_ATOMIC_COUNTER), NAMES);
     }
+    
+    
+    private static void assertCounterNodeState(@Nonnull NodeBuilder builder, 
+                                               @Nonnull Set<String> hiddenProps, 
+                                               long expectedCounter) {
+        checkNotNull(builder);
+        checkNotNull(hiddenProps);
+        long totalHiddenValue = 0;
+        PropertyState counter = builder.getProperty(PROP_COUNTER);
+        Set<String> hp = Sets.newHashSet(hiddenProps);
+        
+        assertNotNull("counter property cannot be null", counter);
+        assertNull("The increment property should not be there",
+            builder.getProperty(PROP_INCREMENT));
+        for (PropertyState p : builder.getProperties()) {
+            String name = p.getName();
+            if (name.startsWith(":")) {
+                assertTrue("Unexpected hidden property found: " + name, hp.remove(name));
+            }
+            if (name.startsWith(PREFIX_PROP_COUNTER)) {
+                totalHiddenValue += p.getValue(LONG).longValue();
+            }
+        }
+        assertEquals("The sum of the hidden properties does not match the counter", counter
+            .getValue(LONG).longValue(), totalHiddenValue);
+        assertEquals("The counter does not match the expected value", expectedCounter, counter
+            .getValue(LONG).longValue());
+    }
+
+    private static NodeBuilder incrementBy(@Nonnull NodeBuilder builder, @Nonnull PropertyState increment) {
+        return checkNotNull(builder).setProperty(checkNotNull(increment));
+    }
+    
+    @Test
+    public void notCluster() throws CommitFailedException {
+        NodeBuilder builder;
+        NodeState before, after;
+        
+        builder = EMPTY_NODE.builder();
+        before = builder.getNodeState();
+        builder = setMixin(builder);
+        builder = incrementBy(builder, INCREMENT_BY_1);
+        after = builder.getNodeState();
+        builder = HOOK_NO_CLUSTER.processCommit(before, after, EMPTY).builder();
+        assertCounterNodeState(builder, ImmutableSet.of(PREFIX_PROP_COUNTER, PREFIX_PROP_REVISION), 1);
+
+        before = builder.getNodeState();
+        builder = incrementBy(builder, INCREMENT_BY_2);
+        after = builder.getNodeState(); 
+        builder = HOOK_NO_CLUSTER.processCommit(before, after, EMPTY).builder();
+        assertCounterNodeState(builder, ImmutableSet.of(PREFIX_PROP_COUNTER, PREFIX_PROP_REVISION), 3);
+    }
+    
+    /**
+     * simulates the update from multiple oak instances
+     * @throws CommitFailedException 
+     */
+    @Test
+    public void multipleNodeUpdates() throws CommitFailedException {
+        NodeBuilder builder;
+        NodeState before, after;
+        
+        builder = EMPTY_NODE.builder();
+        before = builder.getNodeState(); 
+        builder = setMixin(builder);
+        builder = incrementBy(builder, INCREMENT_BY_1);
+        after = builder.getNodeState();
+        builder = HOOK_1_SYNC.processCommit(before, after, EMPTY).builder();
+        assertCounterNodeState(
+            builder, 
+            ImmutableSet.of(PREFIX_PROP_COUNTER + "1", PREFIX_PROP_REVISION + "1"),
+            1);
+        
+        before = builder.getNodeState();
+        builder = incrementBy(builder, INCREMENT_BY_1);
+        after = builder.getNodeState();
+        builder = HOOK_2_SYNC.processCommit(before, after, EMPTY).builder();
+        assertCounterNodeState(
+            builder,
+            ImmutableSet.of(
+                PREFIX_PROP_COUNTER + "1",
+                PREFIX_PROP_COUNTER + "2", 
+                PREFIX_PROP_REVISION + "1",
+                PREFIX_PROP_REVISION + "2"),
+            2);
+    }
+    
+    /**
+     * covers the revision increments aspect
+     * @throws CommitFailedException 
+     */
+    @Test
+    public void revisionIncrements() throws CommitFailedException {
+        NodeBuilder builder;
+        NodeState before, after;
+        PropertyState rev;
+        
+        builder = EMPTY_NODE.builder();
+        before = builder.getNodeState();
+        builder = setMixin(builder);
+        builder = incrementBy(builder, INCREMENT_BY_1);
+        after = builder.getNodeState();
+        builder = HOOK_1_SYNC.processCommit(before, after, EMPTY).builder();
+        rev = builder.getProperty(PREFIX_PROP_REVISION + "1");
+        assertNotNull(rev);
+        assertEquals(1, rev.getValue(LONG).longValue());
+
+        before = builder.getNodeState();
+        builder = incrementBy(builder, INCREMENT_BY_2);
+        after = builder.getNodeState();
+        builder = HOOK_1_SYNC.processCommit(before, after, EMPTY).builder();
+        rev = builder.getProperty(PREFIX_PROP_REVISION + "1");
+        assertNotNull(rev);
+        assertEquals(2, rev.getValue(LONG).longValue());
+
+        before = builder.getNodeState();
+        builder = incrementBy(builder, INCREMENT_BY_1);
+        after = builder.getNodeState();
+        builder = HOOK_2_SYNC.processCommit(before, after, EMPTY).builder();
+        rev = builder.getProperty(PREFIX_PROP_REVISION + "1");
+        assertNotNull(rev);
+        assertEquals(2, rev.getValue(LONG).longValue());
+        rev = builder.getProperty(PREFIX_PROP_REVISION + "2");
+        assertNotNull(rev);
+        assertEquals(1, rev.getValue(LONG).longValue());
+    }
+    
+    @Test
+    public void singleNodeAsync() throws CommitFailedException, InterruptedException, ExecutionException {
+        NodeStore store = NodeStoreFixture.MEMORY_NS.createNodeStore();
+        MyExecutor exec1 = new MyExecutor();
+        Whiteboard board = new DefaultWhiteboard();
+        EditorHook hook1 = new EditorHook(new TestableACEProvider(CLUSTER_1, exec1, store, board));
+        NodeBuilder builder, root;
+        PropertyState p;
+        
+        board.register(CommitHook.class, EmptyHook.INSTANCE, null);
+        
+        root = store.getRoot().builder();
+        builder = root.child("c");
+        builder = setMixin(builder);
+        builder = incrementBy(builder, INCREMENT_BY_1);
+        store.merge(root, hook1, CommitInfo.EMPTY);
+        
+        // as we're providing all the information we expect the counter not to be consolidated for
+        // as long as the scheduled process has run
+        builder = store.getRoot().builder().getChildNode("c");
+        assertTrue(builder.exists());
+        p = builder.getProperty(PREFIX_PROP_REVISION + CLUSTER_1.getInstanceId());
+        assertNotNull(p);
+        assertEquals(1, p.getValue(LONG).longValue());
+        p = builder.getProperty(PREFIX_PROP_COUNTER + CLUSTER_1.getInstanceId());
+        assertNotNull(p);
+        assertEquals(1, p.getValue(LONG).longValue());
+        p = builder.getProperty(PROP_COUNTER);
+        assertNull(p);
+        
+        // executing the consolidation
+        exec1.execute();
+        
+        // fetching the latest store state to see the changes
+        builder = store.getRoot().builder().getChildNode("c");
+        assertTrue("the counter node should exists", builder.exists());
+        assertCounterNodeState(
+            builder,
+            ImmutableSet.of(PREFIX_PROP_COUNTER + CLUSTER_1.getInstanceId(),
+                PREFIX_PROP_REVISION + CLUSTER_1.getInstanceId()), 1);
+    }
+    
+    @Test
+    public void noHookInWhiteboard() throws CommitFailedException, InterruptedException, ExecutionException {
+        NodeStore store = NodeStoreFixture.MEMORY_NS.createNodeStore();
+        MyExecutor exec1 = new MyExecutor();
+        Whiteboard board = new DefaultWhiteboard();
+        EditorHook hook1 = new EditorHook(new TestableACEProvider(CLUSTER_1, exec1, store, board));
+        NodeBuilder builder, root;
+        PropertyState p;
+        
+        
+        root = store.getRoot().builder();
+        builder = root.child("c");
+        builder = setMixin(builder);
+        builder = incrementBy(builder, INCREMENT_BY_1);
+        store.merge(root, hook1, CommitInfo.EMPTY);
+        
+        // as we're providing all the information we expect the counter not to be consolidated for
+        // as long as the scheduled process has run
+        builder = store.getRoot().builder().getChildNode("c");
+        assertTrue(builder.exists());
+        p = builder.getProperty(PREFIX_PROP_REVISION + CLUSTER_1.getInstanceId());
+        assertNotNull(p);
+        assertEquals(1, p.getValue(LONG).longValue());
+        p = builder.getProperty(PREFIX_PROP_COUNTER + CLUSTER_1.getInstanceId());
+        assertNotNull(p);
+        assertEquals(1, p.getValue(LONG).longValue());
+        p = builder.getProperty(PROP_COUNTER);
+        assertEquals(1, p.getValue(LONG).longValue());
+        
+        assertTrue("without a registered hook it should have fell to sync", exec1.isEmpty());
+        
+        // fetching the latest store state to see the changes
+        builder = store.getRoot().builder().getChildNode("c");
+        assertTrue("the counter node should exists", builder.exists());
+        assertCounterNodeState(
+            builder,
+            ImmutableSet.of(PREFIX_PROP_COUNTER + CLUSTER_1.getInstanceId(),
+                PREFIX_PROP_REVISION + CLUSTER_1.getInstanceId()), 1);
+    }
+    
+    /**
+     * a fake {@link ScheduledExecutorService} which does not schedule and wait for a call on
+     * {@link #execute()} to execute the first scheduled task. It works in a FIFO manner.
+     */
+    private static class MyExecutor extends AbstractExecutorService implements ScheduledExecutorService {
+        private static final Logger LOG = LoggerFactory.getLogger(MyExecutor.class);
+        
+        @SuppressWarnings("rawtypes")
+        private Queue<ScheduledFuture> fifo = new LinkedList<ScheduledFuture>();
+        
+        @Override
+        public void shutdown() {
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            return null;
+        }
+
+        @Override
+        public boolean isShutdown() {
+            return false;
+        }
+
+        @Override
+        public boolean isTerminated() {
+            return false;
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+            return false;
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            throw new UnsupportedOperationException("Not implemented");
+        }
+
+        @Override
+        public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+            throw new UnsupportedOperationException("Not implemented");
+        }
+
+        private synchronized void addToQueue(@SuppressWarnings("rawtypes") @Nonnull ScheduledFuture future) {
+            fifo.add(future);
+        }
+        
+        /**
+         * return true whether the underlying queue is empty or not
+         * 
+         * @return
+         */
+        public boolean isEmpty() {
+            return fifo.isEmpty();
+        }
+        
+        @SuppressWarnings("rawtypes")
+        private synchronized ScheduledFuture getFromQueue() {
+            if (fifo.isEmpty()) {
+                return null;
+            } else {
+                return fifo.remove();
+            }
+        }
+        
+        @Override
+        public <V> ScheduledFuture<V> schedule(final Callable<V> callable, long delay, TimeUnit unit) {
+            LOG.debug("Scheduling with delay: {} and unit: {} the process {}", delay, unit, callable);
+            
+            checkNotNull(callable);
+            checkNotNull(unit);
+            if (delay < 0) {
+                delay = 0;
+            }
+                        
+            ScheduledFuture<V> future = new ScheduledFuture<V>() {
+                final Callable<V> c = callable;
+                
+                @Override
+                public long getDelay(TimeUnit unit) {
+                    throw new UnsupportedOperationException("Not implemented");
+                }
+
+                @Override
+                public int compareTo(Delayed o) {
+                    throw new UnsupportedOperationException("Not implemented");
+                }
+
+                @Override
+                public boolean cancel(boolean mayInterruptIfRunning) {
+                    throw new UnsupportedOperationException("Not implemented");
+                }
+
+                @Override
+                public boolean isCancelled() {
+                    throw new UnsupportedOperationException("Not implemented");
+                }
+
+                @Override
+                public boolean isDone() {
+                    throw new UnsupportedOperationException("Not implemented");
+                }
+
+                @Override
+                public V get() throws InterruptedException, ExecutionException {
+                    try {
+                        return c.call();
+                    } catch (Exception e) {
+                        throw new ExecutionException(e);
+                    }
+                }
+
+                @Override
+                public V get(long timeout, TimeUnit unit) throws InterruptedException,
+                                                         ExecutionException, TimeoutException {
+                    throw new UnsupportedOperationException("Not implemented");
+                }
+            };
+            
+            addToQueue(future);
+            return future;
+        }
+
+        /**
+         * executes the first item scheduled in the queue. If the queue is empty it will silently
+         * return {@code null} which can easily be the same returned from the scheduled process.
+         * 
+         * @return the result of the {@link ScheduledFuture} or {@code null} if the queue is empty.
+         * @throws InterruptedException
+         * @throws ExecutionException
+         */
+        @CheckForNull
+        public Object execute() throws InterruptedException, ExecutionException {
+            ScheduledFuture<?> f = getFromQueue();
+            if (f == null) {
+                return null;
+            } else {
+                return f.get();
+            }
+        }
+        
+        @Override
+        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
+                                                      long period, TimeUnit unit) {
+            throw new UnsupportedOperationException("Not implemented");
+        }
+
+        @Override
+        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
+                                                         long delay, TimeUnit unit) {
+            throw new UnsupportedOperationException("Not implemented");
+        }
+    }
+    
+    @Test
+    public void checkRevision() {
+        NodeBuilder b = EMPTY_NODE.builder();
+        PropertyState r = LongPropertyState.createLongProperty("r", 10L);
+        
+        assertTrue(AtomicCounterEditor.checkRevision(b, null));
+        assertFalse(AtomicCounterEditor.checkRevision(b, r));
+        
+        b.setProperty(LongPropertyState.createLongProperty(r.getName(), 1L));
+        assertFalse(AtomicCounterEditor.checkRevision(b, r));
+        
+        b.setProperty(LongPropertyState.createLongProperty(r.getName(), 10L));
+        assertTrue(AtomicCounterEditor.checkRevision(b, r));
+
+        b.setProperty(LongPropertyState.createLongProperty(r.getName(), 20L));
+        assertTrue(AtomicCounterEditor.checkRevision(b, r));
+    }
+    
+    @Test
+    public void nextDelay() {
+        assertEquals(0, AtomicCounterEditor.ConsolidatorTask.nextDelay(-23456789));
+        assertEquals(1, AtomicCounterEditor.ConsolidatorTask.nextDelay(0));
+        assertEquals(2, AtomicCounterEditor.ConsolidatorTask.nextDelay(1));
+        assertEquals(4, AtomicCounterEditor.ConsolidatorTask.nextDelay(2));
+        assertEquals(8, AtomicCounterEditor.ConsolidatorTask.nextDelay(4));
+        assertEquals(16, AtomicCounterEditor.ConsolidatorTask.nextDelay(8));
+        assertEquals(32, AtomicCounterEditor.ConsolidatorTask.nextDelay(16));
+        assertEquals(Long.MAX_VALUE,
+            AtomicCounterEditor.ConsolidatorTask
+                .nextDelay(AtomicCounterEditor.ConsolidatorTask.MAX_TIMEOUT));
+        assertEquals(Long.MAX_VALUE, AtomicCounterEditor.ConsolidatorTask.nextDelay(45678));
+    }
+    
+    @Test
+    public void isTimeOut() {
+        assertFalse(AtomicCounterEditor.ConsolidatorTask.isTimedOut(0));
+        assertFalse(AtomicCounterEditor.ConsolidatorTask.isTimedOut(1));
+        assertFalse(AtomicCounterEditor.ConsolidatorTask.isTimedOut(2));
+        assertFalse(AtomicCounterEditor.ConsolidatorTask.isTimedOut(4));
+        assertFalse(AtomicCounterEditor.ConsolidatorTask.isTimedOut(8));
+        assertFalse(AtomicCounterEditor.ConsolidatorTask.isTimedOut(16));
+        assertFalse(AtomicCounterEditor.ConsolidatorTask.isTimedOut(32));
+        assertTrue(AtomicCounterEditor.ConsolidatorTask
+            .isTimedOut(AtomicCounterEditor.ConsolidatorTask.MAX_TIMEOUT + 1)); // any number > 32
+        assertTrue(AtomicCounterEditor.ConsolidatorTask.isTimedOut(Long.MAX_VALUE));
+    }
+    
+    @Test
+    public void isConsolidate() {
+        NodeBuilder b = EMPTY_NODE.builder();
+        PropertyState counter, hidden1, hidden2;
+        String hidden1Name = PREFIX_PROP_COUNTER + "1";
+        String hidden2Name = PREFIX_PROP_COUNTER + "2";
+        
+        assertFalse(AtomicCounterEditor.isConsolidate(b));
+        
+        counter = LongPropertyState.createLongProperty(PROP_COUNTER, 0);
+        hidden1 = LongPropertyState.createLongProperty(hidden1Name, 1);
+        b.setProperty(counter);
+        b.setProperty(hidden1);
+        assertTrue(AtomicCounterEditor.isConsolidate(b));
+
+        counter = LongPropertyState.createLongProperty(PROP_COUNTER, 1);
+        hidden1 = LongPropertyState.createLongProperty(hidden1Name, 1);
+        hidden2 = LongPropertyState.createLongProperty(hidden2Name, 1);
+        b.setProperty(counter);
+        b.setProperty(hidden1);
+        b.setProperty(hidden2);
+        assertTrue(AtomicCounterEditor.isConsolidate(b));
+
+        counter = LongPropertyState.createLongProperty(PROP_COUNTER, 2);
+        hidden1 = LongPropertyState.createLongProperty(hidden1Name, 1);
+        hidden2 = LongPropertyState.createLongProperty(hidden2Name, 1);
+        b.setProperty(counter);
+        b.setProperty(hidden1);
+        b.setProperty(hidden2);
+        assertFalse(AtomicCounterEditor.isConsolidate(b));
+    }
 }

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java?rev=1725250&r1=1725249&r2=1725250&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java Mon Jan 18 12:16:29 2016
@@ -31,7 +31,6 @@ import javax.jcr.Repository;
 import org.apache.jackrabbit.oak.Oak;
 import org.apache.jackrabbit.oak.api.ContentRepository;
 import org.apache.jackrabbit.oak.jcr.repository.RepositoryImpl;
-import org.apache.jackrabbit.oak.plugins.atomic.AtomicCounterEditorProvider;
 import org.apache.jackrabbit.oak.plugins.commit.ConflictValidatorProvider;
 import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.index.counter.NodeCounterEditorProvider;
@@ -106,7 +105,7 @@ public class Jcr {
     private Repository repository;
     
     private Clusterable clusterable;
-
+    
     public Jcr(Oak oak) {
         this.oak = oak;
 
@@ -121,7 +120,7 @@ public class Jcr {
         with(new NamespaceEditorProvider());
         with(new TypeEditorProvider());
         with(new ConflictValidatorProvider());
-        with(new AtomicCounterEditorProvider());
+        
         with(new ReferenceEditorProvider());
         with(new ReferenceIndexProvider());
 
@@ -156,6 +155,12 @@ public class Jcr {
         return this;
     }
 
+    public Jcr withAtomicCounter() {
+        ensureRepositoryIsNotCreated();
+        oak.withAtomicCounter();
+        return this;
+    }
+    
     private void ensureRepositoryIsNotCreated() {
         checkState(repository == null && contentRepository == null,
                 "Repository was already created");

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/package-info.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/package-info.java?rev=1725250&r1=1725249&r2=1725250&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/package-info.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/package-info.java Mon Jan 18 12:16:29 2016
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-@Version("1.2.0")
+@Version("1.3.0")
 @Export(optional = "provide:=true")
 package org.apache.jackrabbit.oak.jcr;
 

Added: jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterClusterIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterClusterIT.java?rev=1725250&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterClusterIT.java (added)
+++ jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterClusterIT.java Mon Jan 18 12:16:29 2016
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.jcr;
+
+import static org.apache.jackrabbit.oak.plugins.atomic.AtomicCounterEditor.PROP_COUNTER;
+import static org.apache.jackrabbit.oak.plugins.atomic.AtomicCounterEditor.PROP_INCREMENT;
+import static org.apache.jackrabbit.oak.plugins.nodetype.NodeTypeConstants.MIX_ATOMIC_COUNTER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assume.assumeTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jcr.Node;
+import javax.jcr.Repository;
+import javax.jcr.Session;
+
+import org.apache.jackrabbit.oak.commons.FixturesHelper;
+import org.apache.jackrabbit.oak.commons.FixturesHelper.Fixture;
+import org.apache.jackrabbit.oak.plugins.atomic.AtomicCounterEditor;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.util.PerfLogger;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFutureTask;
+
+public class AtomicCounterClusterIT  extends DocumentClusterIT {
+    private static final Set<Fixture> FIXTURES = FixturesHelper.getFixtures();
+    private static final Logger LOG = LoggerFactory.getLogger(AtomicCounterClusterIT.class);
+    private static final PerfLogger LOG_PERF = new PerfLogger(LOG);
+    private List<CustomScheduledExecutor> executors = Lists.newArrayList();
+    
+    @BeforeClass
+    public static void assumtions() {
+        assumeTrue(FIXTURES.contains(Fixture.DOCUMENT_NS));
+        assumeTrue(OakMongoNSRepositoryStub.isMongoDBAvailable());
+    }
+    
+    @Override
+    public void before() throws Exception {
+        super.before();
+        executors = Lists.newArrayList();
+    }
+
+    @Test
+    public void increments() throws Exception {
+        setUpCluster(this.getClass(), mks, repos, NOT_PROVIDED);
+
+        assertEquals("repositories and executors should match", repos.size(), executors.size());
+        
+        final String counterPath;
+        final Random rnd = new Random(14);
+        final AtomicLong expected = new AtomicLong(0);
+        final Map<String, Exception> exceptions = Collections.synchronizedMap(
+            new HashMap<String, Exception>());
+
+        // setting-up the repo state
+        Repository repo = repos.get(0);
+        Session session = repo.login(ADMIN);
+        Node counter;
+        
+        try {
+            counter = session.getRootNode().addNode("counter");
+            counter.addMixin(MIX_ATOMIC_COUNTER);
+            session.save();
+            
+            counterPath = counter.getPath();
+        } finally {
+            session.logout();
+        }
+        
+        // allow the cluster to align
+        Thread.sleep(1500);
+                
+        // asserting the initial state
+        assertFalse("Path to the counter node should be set", Strings.isNullOrEmpty(counterPath));
+        for (Repository r : repos) {
+            
+            try {
+                session = r.login(ADMIN);
+                counter = session.getNode(counterPath);
+                assertEquals("Nothing should have touched the `expected`", 0, expected.get());
+                assertEquals(
+                    "Wrong initial counter", 
+                    expected.get(), 
+                    counter.getProperty(PROP_COUNTER).getLong());
+            } finally {
+                session.logout();
+            }
+            
+        }
+        
+        // number of threads per cluster node
+        final int numIncrements = Integer.getInteger("oak.test.it.atomiccounter.threads", 100);
+        
+        LOG.debug(
+            "pushing {} increments per each of the {} cluster nodes for a total of {} concurrent updates",
+            numIncrements, repos.size(), numIncrements * repos.size());
+        
+        // for each cluster node, `numIncrements` sessions pushing random increments
+        long start = LOG_PERF.start("Firing the threads");
+        List<ListenableFutureTask<Void>> tasks = Lists.newArrayList();
+        for (Repository rep : repos) {
+            final Repository r = rep;
+            for (int i = 0; i < numIncrements; i++) {
+                ListenableFutureTask<Void> task = ListenableFutureTask.create(new Callable<Void>() {
+
+                        @Override
+                        public Void call() throws Exception {
+                            Session s = r.login(ADMIN);
+                            try {
+                                try {
+                                    Node n = s.getNode(counterPath);
+                                    int increment = rnd.nextInt(10) + 1;
+                                    n.setProperty(PROP_INCREMENT, increment);
+                                    expected.addAndGet(increment);
+                                    s.save();
+                                } finally {
+                                    s.logout();
+                                }                                
+                            } catch (Exception e) {
+                                exceptions.put(Thread.currentThread().getName(), e);
+                            }
+                            return null;
+                        }
+                });
+                new Thread(task).start();
+                tasks.add(task);
+            }
+        }
+        LOG_PERF.end(start, -1, "Firing threads completed", "");
+        Futures.allAsList(tasks).get();
+        LOG_PERF.end(start, -1, "Futures completed", "");
+        
+        waitForTaskCompletion();
+        LOG_PERF.end(start, -1, "All tasks completed", "");
+        
+        // let the time for the async process to kick in and run.
+        Thread.sleep(5000);
+        
+        raiseExceptions(exceptions, LOG);
+        
+        // assert the final situation
+        for (int i = 0; i < repos.size(); i++) {
+            Repository r = repos.get(i);
+            try {
+                session = r.login(ADMIN);
+                counter = session.getNode(counterPath);
+                LOG.debug("Cluster node: {}, actual counter: {}, expected counter: {}", i + 1,
+                    expected.get(), counter.getProperty(PROP_COUNTER).getLong());
+                assertEquals(
+                    "Wrong counter on node " + (i + 1), 
+                    expected.get(), 
+                    counter.getProperty(PROP_COUNTER).getLong());
+            } finally {
+                session.logout();
+            }
+            
+        }
+    }
+    
+    private void waitForTaskCompletion() throws InterruptedException {
+        int remainingTasks;
+        do {
+            remainingTasks = 0;
+            for (CustomScheduledExecutor e : executors) {
+                remainingTasks += e.getTotal();
+            }
+            if (remainingTasks > 0) {
+                LOG.debug("there are approximately {} tasks left to complete. Sleeping 1 sec",
+                    remainingTasks);
+                Thread.sleep(1000);
+            }
+        } while (remainingTasks > 0);
+    }
+    
+    private class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
+        private volatile AtomicInteger total = new AtomicInteger();
+        
+        private class CustomTask<V> implements RunnableScheduledFuture<V> {
+            private final RunnableScheduledFuture<V> task;
+            
+            public CustomTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
+                this.task = task;
+            }
+            
+            @Override
+            public void run() {
+                task.run();
+                total.decrementAndGet();
+            }
+
+            @Override
+            public boolean cancel(boolean mayInterruptIfRunning) {
+                return task.cancel(mayInterruptIfRunning);
+            }
+
+            @Override
+            public boolean isCancelled() {
+                return task.isCancelled();
+            }
+
+            @Override
+            public boolean isDone() {
+                return task.isDone();
+            }
+
+            @Override
+            public V get() throws InterruptedException, ExecutionException {
+                return task.get();
+            }
+
+            @Override
+            public V get(long timeout, TimeUnit unit) throws InterruptedException,
+                                                     ExecutionException, TimeoutException {
+                return task.get(timeout, unit);
+            }
+
+            @Override
+            public long getDelay(TimeUnit unit) {
+                return task.getDelay(unit);
+            }
+
+            @Override
+            public int compareTo(Delayed o) {
+                return task.compareTo(o);
+            }
+
+            @Override
+            public boolean isPeriodic() {
+                return task.isPeriodic();
+            }
+        }
+        
+        public CustomScheduledExecutor(int corePoolSize) {
+            super(corePoolSize);
+            total.set(0);
+        }
+
+        @Override
+        protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable,
+                                                              RunnableScheduledFuture<V> task) {
+            if (callable instanceof AtomicCounterEditor.ConsolidatorTask) {
+                total.incrementAndGet();
+                return new CustomTask<V>(callable, task);
+            } else {
+                return super.decorateTask(callable, task);
+            }
+        }
+        
+        /**
+         * return the approximate amount of tasks to be completed
+         * @return
+         */
+        public synchronized int getTotal() {
+            return total.get();
+        }
+    }
+    
+    @Override
+    protected Jcr getJcr(NodeStore store) {
+        CustomScheduledExecutor e = new CustomScheduledExecutor(10);
+        executors.add(e);
+        return super.getJcr(store)
+            .with(e)
+            .withAtomicCounter();
+    }
+}

Modified: jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterIT.java?rev=1725250&r1=1725249&r2=1725250&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterIT.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterIT.java Mon Jan 18 12:16:29 2016
@@ -118,4 +118,9 @@ public class AtomicCounterIT extends Abs
         new Thread(task).start();
         return task;
     }
+
+    @Override
+    protected Jcr initJcr(Jcr jcr) {
+        return super.initJcr(jcr).withAtomicCounter();
+    }
 }

Modified: jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterTest.java?rev=1725250&r1=1725249&r2=1725250&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterTest.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterTest.java Mon Jan 18 12:16:29 2016
@@ -124,5 +124,9 @@ public class AtomicCounterTest extends A
             session.logout();
         }
     }
-    
+
+    @Override
+    protected Jcr initJcr(Jcr jcr) {
+        return super.initJcr(jcr).withAtomicCounter();
+    }
 }

Modified: jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java?rev=1725250&r1=1725249&r2=1725250&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java Mon Jan 18 12:16:29 2016
@@ -17,7 +17,6 @@
 package org.apache.jackrabbit.oak.jcr;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest.dispose;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -34,6 +33,8 @@ import javax.jcr.SimpleCredentials;
 import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
 import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
 import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
+import org.apache.jackrabbit.oak.spi.state.Clusterable;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.junit.After;
 import org.junit.Before;
 import org.slf4j.Logger;
@@ -76,6 +77,10 @@ public abstract class DocumentClusterIT
         mk.dispose(); // closes connection as well
     }
 
+    protected void dispose(@Nonnull Repository repo) {
+        AbstractRepositoryTest.dispose(checkNotNull(repo));
+    }
+    
     @After
     public void after() throws Exception {
         for (Repository repo : repos) {
@@ -126,7 +131,8 @@ public abstract class DocumentClusterIT
     }
     
     /**
-     * set up the cluster connections
+     * set up the cluster connections. Same as {@link #setUpCluster(Class, List, List, int)}
+     * providing {@link #NOT_PROVIDED} as {@code asyncDelay}
      * 
      * @param clazz class used for logging into Mongo itself
      * @param mks the list of mks to work on.
@@ -139,6 +145,17 @@ public abstract class DocumentClusterIT
         setUpCluster(clazz, mks, repos, NOT_PROVIDED);
     }
 
+    /**
+     * set up the cluster connections
+     * 
+     * @param clazz class used for logging into Mongo itself
+     * @param mks the list of mks to work on
+     * @param repos list of {@link Repository} created on each {@code mks}
+     * @param asyncDelay the maximum delay for the cluster to sync with last revision. Use
+     *            {@link #NOT_PROVIDED} for implementation default. Use {@code 0} for switching to
+     *            manual and sync with {@link #alignCluster(List)}.
+     * @throws Exception
+     */
     void setUpCluster(@Nonnull final Class<?> clazz, 
                              @Nonnull final List<DocumentMK> mks,
                              @Nonnull final List<Repository> repos,
@@ -185,7 +202,7 @@ public abstract class DocumentClusterIT
         builder.setClusterId(clusterId);
         
         DocumentMK mk = builder.open();
-        Jcr j = new Jcr(mk.getNodeStore());
+        Jcr j = getJcr(mk.getNodeStore());
         
         Set<IndexEditorProvider> ieps = additionalIndexEditorProviders();
         if (ieps != null) {
@@ -204,6 +221,14 @@ public abstract class DocumentClusterIT
         checkNotNull(mks).add(mk);
     }
     
+    protected Jcr getJcr(@Nonnull NodeStore store) {
+        Jcr j = new Jcr(checkNotNull(store));
+        if (store instanceof Clusterable) {
+            j.with((Clusterable) store);
+        }
+        return j;
+    }
+    
     /**
      * <p>
      * the default {@link #initRepository(Class, List, List, int, int)} uses this for registering



Re: svn commit: r1725250 - in /jackrabbit/oak/trunk: oak-core/src/main/java/org/apache/jackrabbit/oak/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/atomic/ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/atomic/ oak-jcr/src/main/java/o...

Posted by Chetan Mehrotra <ch...@gmail.com>.
Hi Davide,

On Mon, Jan 18, 2016 at 5:46 PM,  <da...@apache.org> wrote:
> +     */
> +    public AtomicCounterEditorProvider() {
> +        clusterSupplier = new Supplier<Clusterable>() {
> +            @Override
> +            public Clusterable get() {
> +                return cluster.get();
> +            }
> +        };
> +        schedulerSupplier = new Supplier<ScheduledExecutorService>() {
> +            @Override
> +            public ScheduledExecutorService get() {
> +                return scheduler.get();
> +            }
> +        };
> +        storeSupplier = new Supplier<NodeStore>() {
> +            @Override
> +            public NodeStore get() {
> +                return store.get();
> +            }
> +        };
> +        wbSupplier = new Supplier<Whiteboard>() {
> +            @Override
> +            public Whiteboard get() {
> +                return whiteboard.get();
> +            }
> +        };
> +    }

Just curious about use of above approach. Is it for keeping the
dependencies as non static or using final instance variable? If you
mark references as static then all those bind and unbind method would
not be required as by the time component is active the dependencies
would be set.


Chetan Mehrotra