You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2013/12/03 06:43:03 UTC

svn commit: r1547289 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak: Oak.java plugins/index/AsyncIndexUpdate.java spi/whiteboard/WhiteboardUtils.java

Author: chetanm
Date: Tue Dec  3 05:43:03 2013
New Revision: 1547289

URL: http://svn.apache.org/r1547289
Log:
OAK-1246 - Make AsynchIndexUpdate task to run only on a single node in a cluster

Implemented the support by making use of
1. async-status flag along with timeout handling
2. Also restricting the scheduler to run job on only one node. Default scheduler
   does not support it. However say when running in Sling this would be used

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/WhiteboardUtils.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java?rev=1547289&r1=1547288&r2=1547289&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java Tue Dec  3 05:43:03 2013
@@ -406,7 +406,7 @@ public class Oak {
 
         if (asyncIndexing) {
             Runnable task = new AsyncIndexUpdate("async", store, indexEditors);
-            WhiteboardUtils.scheduleWithFixedDelay(whiteboard, task, 5);
+            WhiteboardUtils.scheduleWithFixedDelay(whiteboard, task, 5, true);
         }
 
         // FIXME: OAK-810 move to proper workspace initialization

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1547289&r1=1547288&r2=1547289&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Tue Dec  3 05:43:03 2013
@@ -32,6 +32,7 @@ import com.google.common.base.Objects;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.value.Conversions;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
 import org.apache.jackrabbit.oak.spi.commit.EditorDiff;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
@@ -58,6 +59,12 @@ public class AsyncIndexUpdate implements
     private static final CommitFailedException CONCURRENT_UPDATE = new CommitFailedException(
             "Async", 1, "Concurrent update detected");
 
+    /**
+     * Timeout in minutes after which an async job would be considered as timed out. Another
+     * node in cluster would wait for timeout before taking over a running job
+     */
+    private static final int ASYNC_TIMEOUT = 15;
+
     private final String name;
 
     private final NodeStore store;
@@ -86,6 +93,11 @@ public class AsyncIndexUpdate implements
             return;
         }
 
+        if(isAlreadyRunning(store)){
+            log.debug("Async job found to be already running. Skipping");
+            return;
+        }
+
         preAsyncRun(store);
 
         NodeBuilder builder = store.getRoot().builder();
@@ -151,6 +163,34 @@ public class AsyncIndexUpdate implements
         }
     }
 
+    private boolean isAlreadyRunning(NodeStore store) {
+        NodeState indexState = store.getRoot().getChildNode(IndexConstants.INDEX_DEFINITIONS_NAME);
+
+        //Probably the first run
+        if (!indexState.exists()) {
+            return false;
+        }
+
+        //Check if already running or timed out
+        if ("running".equals(indexState.getString("async-status"))) {
+            PropertyState startTime = indexState.getProperty("async-start");
+            Calendar start = Conversions.convert(startTime.getValue(Type.DATE)).toCalendar();
+            Calendar now = Calendar.getInstance();
+            long delta = now.getTimeInMillis() - start.getTimeInMillis();
+
+            //Check if the job has timed out and we need to take over
+            if (TimeUnit.MILLISECONDS.toMinutes(delta) > ASYNC_TIMEOUT) {
+                log.info("Async job found which stated on {} has timed out in {} minutes. " +
+                        "This node would take over the job.",
+                        startTime.getValue(Type.DATE), ASYNC_TIMEOUT);
+                return false;
+            }
+            return true;
+        }
+
+        return false;
+    }
+
     private static void preAsyncRunStatus(NodeBuilder builder) {
         builder.getChildNode(IndexConstants.INDEX_DEFINITIONS_NAME)
                 .setProperty("async-status", "running")

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/WhiteboardUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/WhiteboardUtils.java?rev=1547289&r1=1547288&r2=1547289&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/WhiteboardUtils.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/WhiteboardUtils.java Tue Dec  3 05:43:03 2013
@@ -29,12 +29,21 @@ public class WhiteboardUtils {
     private static final AtomicLong COUNTER = new AtomicLong();
 
     public static Registration scheduleWithFixedDelay(
-            Whiteboard whiteboard, Runnable runnable, long delay) {
+            Whiteboard whiteboard, Runnable runnable, long delayInSeconds) {
+        return scheduleWithFixedDelay(whiteboard, runnable, delayInSeconds, false);
+    }
+
+    public static Registration scheduleWithFixedDelay(
+            Whiteboard whiteboard, Runnable runnable, long delayInSeconds, boolean runOnSingleClusterNode) {
+        ImmutableMap.Builder<String,Object> builder = ImmutableMap.<String,Object>builder()
+                .put("scheduler.period", delayInSeconds)
+                .put("scheduler.concurrent", false);
+        if (runOnSingleClusterNode) {
+            //Make use of feature while running in Sling SLING-2979
+            builder.put("scheduler.runOn", "SINGLE");
+        }
         return whiteboard.register(
-                Runnable.class, runnable, ImmutableMap.builder()
-                    .put("scheduler.period", delay)
-                    .put("scheduler.concurrent", false)
-                    .build());
+                Runnable.class, runnable, builder.build());
     }
 
     public static <T> Registration registerMBean(