You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by st...@apache.org on 2015/08/24 13:25:49 UTC

svn commit: r1697355 [1/2] - in /jackrabbit/oak/trunk/oak-core: ./ src/main/java/org/apache/jackrabbit/oak/plugins/document/ src/test/java/org/apache/jackrabbit/oak/plugins/document/

Author: stefanegli
Date: Mon Aug 24 11:25:48 2015
New Revision: 1697355

URL: http://svn.apache.org/r1697355
Log:
OAK-2844 : Introducing a simple discovery-light service - currently only available when on documentMk - exposes a new repository descriptor oak.discoverylite.clusterview which lists all active/deactivating/inactive instances connected to the same document backend

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterStateChangeListener.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterView.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocumentTest.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewTest.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceTest.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/pom.xml
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java

Modified: jackrabbit/oak/trunk/oak-core/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/pom.xml?rev=1697355&r1=1697354&r2=1697355&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-core/pom.xml Mon Aug 24 11:25:48 2015
@@ -317,6 +317,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>junit-addons</groupId>
+      <artifactId>junit-addons</artifactId>
+      <version>1.4</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
       <version>1.9.5</version>

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java?rev=1697355&r1=1697354&r2=1697355&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java Mon Aug 24 11:25:48 2015
@@ -67,6 +67,14 @@ public class ClusterNodeInfo {
     public static final String LEASE_END_KEY = "leaseEnd";
 
     /**
+     * The key for the root-revision of the last background write (of unsaved
+     * modifications) - that is: the last root-revision written by the instance
+     * in case of a clear shutdown or via recovery of another instance in case
+     * of a crash
+     */
+    public static final String LAST_WRITTEN_ROOT_REV_KEY = "lastWrittenRootRev";
+
+    /**
      * The state of the cluster. On proper shutdown the state should be cleared.
      *
      * @see org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.ClusterNodeState

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java?rev=1697355&r1=1697354&r2=1697355&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java Mon Aug 24 11:25:48 2015
@@ -77,4 +77,11 @@ public class ClusterNodeInfoDocument ext
     private RecoverLockState getRecoveryState(){
         return RecoverLockState.fromString((String) get(ClusterNodeInfo.REV_RECOVERY_LOCK));
     }
+
+    /**
+     * the root-revision of the last background write (of unsaved modifications)
+     **/
+    public String getLastWrittenRootRev() {
+        return (String) get(ClusterNodeInfo.LAST_WRITTEN_ROOT_REV_KEY);
+    }
 }

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterStateChangeListener.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterStateChangeListener.java?rev=1697355&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterStateChangeListener.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterStateChangeListener.java Mon Aug 24 11:25:48 2015
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+/**
+ * DocumentNS-internal listener that gets invoked when a change in the
+ * clusterNodes collection (active/inactive/timed out/recovering) is detected.
+ */
+public interface ClusterStateChangeListener {
+
+    /**
+     * Informs the listener that DocumentNodeStore has discovered a change in
+     * the clusterNodes collection.
+     */
+    public void handleClusterStateChange();
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterStateChangeListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterView.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterView.java?rev=1697355&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterView.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterView.java Mon Aug 24 11:25:48 2015
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.jackrabbit.oak.commons.json.JsopBuilder;
+
+/**
+ * A ClusterView represents the state of a cluster at a particular moment in
+ * time.
+ * <p>
+ * This is a combination of what is stored in the ClusterViewDocument and the
+ * list of instances that currently have a backlog.
+ * <p>
+ * In order to be able to differentiate and clearly identify the different
+ * states an instance is in, the ClusterView uses a slightly different
+ * terminology of states that it reports:
+ * <ul>
+ * <li>Active: (same as in the ClusterViewDocument) an instance that is alive
+ * and has no recoveryLock set. Whether or not the lease has timed out is
+ * ignored. If the lease would be timed out, this would be immediately noticed
+ * by one of the instances and the affected instance would thus be recovered
+ * soon.</li>
+ * <li>Deactivating: An instance that is either recovering (which is the state
+ * reported from the ClusterViewDocument) - ie it was active until now but the
+ * lease has just timed out and one of the peer instances noticed so it does a
+ * recovery now - or it is inactive but some of its changes are still in the
+ * backlog (the latter is not tracked in the ClusterViewDocument, instead
+ * instances with a backlog are in the 'inactive' bucket there).</li>
+ * <li>Inactive: An instance that is both inactive from a
+ * clusterNodes/ClusterViewDocument point of view (ie no longer active and
+ * already recovered) and it has no backlog anymore.</li>
+ * </ul>
+ * The JSON generated by the ClusterView (which is propagated to JMX) has the
+ * following fields:
+ * <ul>
+ * <li>seq = sequence number: this is a monotonically increasing number assigned
+ * to each incarnation of the persisted clusterView (in the settings
+ * collection). It can be used to take note of the fact that a view has changed
+ * even though perhaps all activeIds are still the same (eg when the listener
+ * would have missed a few changes). It can also be used to tell with certainty
+ * that 'anything has changed' compared to the clusterView with a previous
+ * sequence number</li>
+ * <li>final = is final: this is a boolean indicating whether or not the view
+ * with a particular sequence number is final (not going to change anymore) or
+ * whether the discovery lite takes the freedom to modify the view in the future
+ * (false). So whenever 'final' is false, then the view must be treated as 'in
+ * flux' and perhaps the user should wait with doing any conclusions. That's not
+ * to say that if 'final' is false, that the information provided in
+ * active/deactivating/inactive is wrong - that's of course not the case - that
+ * info is always correct. But when 'final' is false it just means that
+ * active/deactivating/inactive for a given sequence number might change.</li>
+ * <li>id = cluster view id: this is the unique, stable identifier of the local
+ * cluster. The idea of this id is to provide both an actual identifier for the
+ * local cluster as well as a 'namespace' for the instanceIds therein. The
+ * instanceIds are all just simply integers and can of course be the same for
+ * instances in different clusters.</li>
+ * <li>me = my local instance id: this is the id of the local instance as
+ * managed by DocumentNodeStore</li>
+ * <li>active = active instance ids: this is the list of instance ids that are
+ * all currently active in the local cluster. The ids are managed by
+ * DocumentNodeStore</li>
+ * <li>deactivating = deactivating instance ids: this is the list of instance
+ * ids that are all in the process of deactivating and for which therefore some
+ * data might still be making its way to the local instance. So any changes that
+ * were done by instances that are deactivating might not yet be visible locally
+ * </li>
+ * <li>deactive = deactive instance ids: this is the list of instance ids that
+ * are not running nor do they have any data pending to become visible by the
+ * local instance</li>
+ * </ul>
+ */
+class ClusterView {
+
+    /**
+     * the json containing the complete information of the state of this
+     * ClusterView. Created at constructor time for performance reasons (json
+     * will be polled via JMX very frequently, thus must be provided fast)
+     */
+    private final String json;
+
+    /**
+     * Factory method that creates a ClusterView given a ClusterViewDocument and
+     * a list of instances that currently have a backlog.
+     * <p>
+     * The ClusterViewDocument contains instances in the following states:
+     * <ul>
+     * <li>active</li>
+     * <li>recovering</li>
+     * <li>inactive</li>
+     * </ul>
+     * The ClusterView however reports these upwards as follows:
+     * <ul>
+     * <li>active: this is 1:1 the active ones from the ClusterViewDocument</li>
+     * <li>deactivating: this includes the recovering ones from the
+     * ClusterViewDocument plus those passed to this method in the backlogIds
+     * parameter</li>
+     * <li>inactive: this is the inactive ones from the ClusterViewDocument
+     * <b>minus</li> the backlogIds passed</li>
+     * </ul>
+     * 
+     * @param localInstanceId
+     *            the id of the local instance (me)
+     * @param clusterViewDoc
+     *            the ClusterViewDocument which contains the currently persisted
+     *            cluster view
+     * @param backlogIds
+     *            the ids that the local instances still has not finished a
+     *            background read for and thus still have a backlog
+     * @return the ClusterView representing the provided info
+     */
+    static ClusterView fromDocument(int localInstanceId, ClusterViewDocument clusterViewDoc, Set<Integer> backlogIds) {
+        Set<Integer> activeIds = clusterViewDoc.getActiveIds();
+        Set<Integer> deactivatingIds = new HashSet<Integer>();
+        deactivatingIds.addAll(clusterViewDoc.getRecoveringIds());
+        deactivatingIds.addAll(backlogIds);
+        Set<Integer> inactiveIds = new HashSet<Integer>();
+        inactiveIds.addAll(clusterViewDoc.getInactiveIds());
+        if (!inactiveIds.removeAll(backlogIds) && backlogIds.size() > 0) {
+            // then not all backlogIds were listed is inactive - which is
+            // contrary to the expectation
+            // in which case we indeed do a paranoia exception here:
+            throw new IllegalStateException(
+                    "not all backlogIds (" + backlogIds + ") are part of inactiveIds (" + clusterViewDoc.getInactiveIds() + ")");
+        }
+        return new ClusterView(clusterViewDoc.getViewSeqNum(), backlogIds.size() == 0, clusterViewDoc.getClusterViewId(),
+                localInstanceId, activeIds, deactivatingIds, inactiveIds);
+    }
+
+    ClusterView(final long viewSeqNum, final boolean viewFinal, final String clusterViewId, final int localId,
+            final Set<Integer> activeIds, final Set<Integer> deactivatingIds, final Set<Integer> inactiveIds) {
+        if (viewSeqNum < 0) {
+            throw new IllegalStateException("viewSeqNum must be zero or higher: " + viewSeqNum);
+        }
+        if (clusterViewId == null || clusterViewId.length() == 0) {
+            throw new IllegalStateException("clusterViewId must not be zero or empty: " + clusterViewId);
+        }
+        if (localId < 0) {
+            throw new IllegalStateException("localId must not be zero or higher: " + localId);
+        }
+        if (activeIds == null || activeIds.size() == 0) {
+            throw new IllegalStateException("activeIds must not be null or empty");
+        }
+        if (deactivatingIds == null) {
+            throw new IllegalStateException("deactivatingIds must not be null");
+        }
+        if (inactiveIds == null) {
+            throw new IllegalStateException("inactiveIds must not be null");
+        }
+
+        json = asJson(viewSeqNum, viewFinal, clusterViewId, localId, activeIds, deactivatingIds, inactiveIds);
+    }
+
+    /**
+     * Converts the provided parameters into the clusterview json that will be
+     * provided via JMX
+     **/
+    private String asJson(final long viewSeqNum, final boolean viewFinal, final String clusterViewId, final int localId,
+            final Set<Integer> activeIds, final Set<Integer> deactivatingIds, final Set<Integer> inactiveIds) {
+        JsopBuilder builder = new JsopBuilder();
+        builder.object();
+        builder.key("seq").value(viewSeqNum);
+        builder.key("final").value(viewFinal);
+        builder.key("id").value(clusterViewId);
+        builder.key("me").value(localId);
+        builder.key("active").array();
+        for (Iterator<Integer> it = activeIds.iterator(); it.hasNext();) {
+            Integer anInstance = it.next();
+            builder.value(anInstance);
+        }
+        builder.endArray();
+        builder.key("deactivating").array();
+        for (Iterator<Integer> it = deactivatingIds.iterator(); it.hasNext();) {
+            Integer anInstance = it.next();
+            builder.value(anInstance);
+        }
+        builder.endArray();
+        builder.key("inactive").array();
+        for (Iterator<Integer> it = inactiveIds.iterator(); it.hasNext();) {
+            Integer anInstance = it.next();
+            builder.value(anInstance);
+        }
+        builder.endArray();
+        builder.endObject();
+        return builder.toString();
+    }
+
+    /** Debugging toString() **/
+    @Override
+    public String toString() {
+        return "a ClusterView[" + json + "]";
+    }
+
+    /** This is the main getter that will be polled via JMX **/
+    String asDescriptorValue() {
+        return json;
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterView.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java?rev=1697355&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java Mon Aug 24 11:25:48 2015
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.jackrabbit.oak.commons.json.JsopBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents the document stored in the settings collection containing a
+ * 'cluster view'.
+ * <p>
+ * A 'cluster view' is the state of the membership of instances that are or have
+ * all been connected to the same oak repository. The 'cluster view' is
+ * maintained by all instances in the cluster concurrently - the faster one
+ * wins. Its information is derived from the clusterNodes collection. From there
+ * the following three states are derived and instances are grouped into these:
+ * <ul>
+ * <li>Active: an instance is active and has no recoveryLock is currently
+ * acquired. The lease timeout is ignored. When the lease times out, this is
+ * noticed by one of the instances at some point and a recovery is started, at
+ * which point the instance transitions from 'Active' to 'Recovering'.</li>
+ * <li>Recovering: an instance that was active but currently has the
+ * recoveryLock acquired by one of the instances.</li>
+ * <li>Inactive: an instance is not set to active (in which case the
+ * recoveryLock is never set)</li>
+ * </ul>
+ * <p>
+ * Note that the states managed in this ClusterViewDocument differs from the one
+ * from ClusterView - since ClusterView also manages the fact that after a
+ * recovery of a crashed instance there could be a 'backlog' of changes which it
+ * doesn't yet see until a background read is performed.
+ */
+class ClusterViewDocument {
+
+    private static final Logger logger = LoggerFactory.getLogger(ClusterViewDocument.class);
+
+    /** the id of this document is always 'clusterView' **/
+    private static final String CLUSTERVIEW_DOC_ID = "clusterView";
+
+    // keys that we store in the root document - and in the history
+    /**
+     * document key that stores the stable id of the cluster (will never change)
+     * (Note: a better term would have been just clusterId - but that one is
+     * already occupied with what should actually be called clusterNodeId or
+     * just nodeId)
+     **/
+    static final String CLUSTER_VIEW_ID_KEY = "clusterViewId";
+
+    /**
+     * document key that stores the monotonically incrementing sequence number
+     * of the cluster view. Any update will increase this by 1
+     **/
+    static final String VIEW_SEQ_NUM_KEY = "seqNum";
+
+    /**
+     * document key that stores the comma-separated list of active instance ids
+     **/
+    static final String ACTIVE_KEY = "active";
+
+    /**
+     * document key that stores the comma-separated list of inactive instance
+     * ids (they might still have a backlog, that is handled in ClusterView
+     * though, never persisted
+     */
+    static final String INACTIVE_KEY = "inactive";
+
+    /**
+     * document key that stores the comma-separated list of recovering instance
+     * ids
+     **/
+    static final String RECOVERING_KEY = "recovering";
+
+    /**
+     * document key that stores the date and time when this view was created -
+     * for debugging purpose only
+     **/
+    private static final String CREATED_KEY = "created";
+
+    /**
+     * document key that stores the id of the instance that created this view -
+     * for debugging purpose only
+     **/
+    private static final String CREATOR_KEY = "creator";
+
+    /**
+     * document key that stores the date and time when this was was retired -
+     * for debugging purpose only
+     **/
+    private static final String RETIRED_KEY = "retired";
+
+    /**
+     * document key that stores the id of the instance that retired this view -
+     * for debugging purpose only
+     **/
+    private static final String RETIRER_KEY = "retirer";
+
+    /**
+     * document key that stores a short, limited history of previous cluster
+     * views - for debugging purpose only
+     **/
+    private static final String CLUSTER_VIEW_HISTORY_KEY = "clusterViewHistory";
+
+    /** the format used when storing date+time **/
+    private static final DateFormat standardDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
+
+    /** number of elements kept in the CLUSTERVIEW_HISTORY_KEY field **/
+    private static final int HISTORY_LIMIT = 10;
+
+    /** the monotonically incrementing sequence number of this cluster view **/
+    private final long viewSeqNum;
+
+    /** the stable id of this cluster **/
+    private final String clusterViewId;
+
+    /** the ids of instances that are active at this moment **/
+    private final Integer[] activeIds;
+
+    /**
+     * the ids of instances that are recovering (lastRev-recovery) at this
+     * moment
+     **/
+    private final Integer[] recoveringIds;
+
+    /** the ids of instances that are inactive at this moment **/
+    private final Integer[] inactiveIds;
+
+    /**
+     * the short, limited history of previous cluster views, for debugging only
+     **/
+    private final Map<Object, String> viewHistory;
+
+    /** the date+time at which this view was created, for debugging only **/
+    private final String createdAt;
+
+    /** the id of the instance that created this view, for debugging only **/
+    private final Integer createdBy;
+
+    /**
+     * Main method by which the ClusterViewDocument is updated in the settings
+     * collection
+     * 
+     * @return the resulting ClusterViewDocument as just updated in the settings
+     *         collection - or null if another instance was updating the
+     *         clusterview concurrently (in which case the caller should re-read
+     *         first and possibly re-update if needed)
+     */
+    static ClusterViewDocument readOrUpdate(DocumentNodeStore documentNodeStore, Set<Integer> activeIds, Set<Integer> recoveringIds,
+            Set<Integer> inactiveIds) {
+        logger.trace("readOrUpdate: expected: activeIds: {}, recoveringIds: {}, inactiveIds: {}", activeIds, recoveringIds,
+                inactiveIds);
+        if (activeIds == null || activeIds.size() == 0) {
+            logger.info("readOrUpdate: activeIds must not be null or empty");
+            throw new IllegalStateException("activeIds must not be null or empty");
+        }
+        int localClusterId = documentNodeStore.getClusterId();
+
+        final ClusterViewDocument previousView = doRead(documentNodeStore);
+        if (previousView != null) {
+            if (previousView.matches(activeIds, recoveringIds, inactiveIds)) {
+                logger.trace("readOrUpdate: view unchanged, returning: {}", previousView);
+                return previousView;
+            }
+        }
+        logger.trace(
+                "readOrUpdate: view change detected, going to update from {} to activeIds: {}, recoveringIds: {}, inactiveIds: {}",
+                previousView, activeIds, recoveringIds, inactiveIds);
+        UpdateOp updateOp = new UpdateOp(CLUSTERVIEW_DOC_ID, true);
+        Date now = new Date();
+        updateOp.set(ACTIVE_KEY, setToCsv(activeIds));
+        updateOp.set(RECOVERING_KEY, setToCsv(recoveringIds));
+        updateOp.set(INACTIVE_KEY, setToCsv(inactiveIds));
+        updateOp.set(CREATED_KEY, standardDateFormat.format(now));
+        updateOp.set(CREATOR_KEY, localClusterId);
+        Map<Object, String> historyMap = new HashMap<Object, String>();
+        if (previousView != null) {
+            Map<Object, String> previousHistory = previousView.getHistory();
+            if (previousHistory != null) {
+                historyMap.putAll(previousHistory);
+            }
+
+            historyMap.put(Revision.newRevision(localClusterId).toString(), asHistoryEntry(previousView, localClusterId, now));
+        }
+        applyHistoryLimit(historyMap);
+        updateOp.set(CLUSTER_VIEW_HISTORY_KEY, historyMap);
+
+        final Long newViewSeqNum;
+        if (previousView == null) {
+            // we are the first ever, looks like, that the clusterview is
+            // defined
+            // so we can use viewId==1 and we make sure no other cluster node
+            // tries to create this first one simultaneously - so we use
+            // 'create'
+
+            // going via 'create' requires ID to be set again (not only in new
+            // UpdateOp(id,isNew)):
+            updateOp.set(Document.ID, CLUSTERVIEW_DOC_ID);
+            ArrayList<UpdateOp> updateOps = new ArrayList<UpdateOp>();
+            newViewSeqNum = 1L;
+            updateOp.setNew(true); // paranoia as that's already set above
+            updateOp.set(VIEW_SEQ_NUM_KEY, newViewSeqNum);
+            // first view ever => choose a new unique clusterViewId
+            String clusterViewId = UUID.randomUUID().toString();
+            updateOp.set(CLUSTER_VIEW_ID_KEY, clusterViewId);
+            updateOps.add(updateOp);
+            logger.debug("updateAndRead: trying to create the first ever clusterView - hence {}={} and {}={}", VIEW_SEQ_NUM_KEY,
+                    newViewSeqNum, CLUSTER_VIEW_ID_KEY, clusterViewId);
+            if (!documentNodeStore.getDocumentStore().create(Collection.SETTINGS, updateOps)) {
+                logger.debug("updateAndRead: someone else just created the first view ever while I tried - reread that one later");
+                return null;
+            }
+        } else {
+            // there were earlier clusterViews (the normal case) - thus we
+            // use 'findAndUpdate' with the condition that
+            // the view id is still at the previousview one
+            Long previousViewSeqNum = previousView.getViewSeqNum();
+            updateOp.setNew(false); // change to false from true above
+            updateOp.equals(VIEW_SEQ_NUM_KEY, null, previousViewSeqNum);
+            newViewSeqNum = previousViewSeqNum + 1;
+            updateOp.set(VIEW_SEQ_NUM_KEY, newViewSeqNum);
+            logger.debug("updateAndRead: trying to update the clusterView to {}={} ", VIEW_SEQ_NUM_KEY, newViewSeqNum);
+            if (documentNodeStore.getDocumentStore().findAndUpdate(Collection.SETTINGS, updateOp) == null) {
+                logger.debug(
+                        "updateAndRead: someone else just updated the view which I wanted to do as well - reread that one later");
+                return null;
+            }
+        }
+
+        // whatever the outcome of the above - we don't care -
+        // re-reading will in any case definitely show what has been persisted
+        // and if the re-read view contains the same id, it is what we have
+        // written
+        // - otherwise someone else came in between and we have to step back and
+        // retry
+        ClusterViewDocument readResult = doRead(documentNodeStore);
+        if (readResult == null) {
+            logger.debug("updateAndRead: got null from read - whatever the exact reason, we must retry in a moment.");
+            return null;
+        } else if (newViewSeqNum.equals(readResult.getViewSeqNum())) {
+            logger.debug("updateAndRead: matching view - no change");
+            return readResult;
+        } else {
+            logger.debug("updateAndRead: someone else in the cluster was updating right after I also succeeded - re-read in a bit");
+            return null;
+        }
+    }
+
+    /**
+     * Pruning method that makes sure the history never gets larger than
+     * HISTORY_LIMIT
+     * 
+     * @param historyMap
+     *            the pruning is done directly on this map
+     */
+    private static void applyHistoryLimit(Map<Object, String> historyMap) {
+        while (historyMap.size() > HISTORY_LIMIT) {
+            // remove the oldest
+            String oldestRevision = null;
+            for (Iterator<Object> it = historyMap.keySet().iterator(); it.hasNext();) {
+                Object obj = it.next();
+                // obj can be a String or a Revision
+                // in case of it being a Revision the toString() will
+                // be appropriate, hence:
+                String r = obj.toString();
+                if (oldestRevision == null) {
+                    oldestRevision = r;
+                } else if (Revision.getTimestampDifference(Revision.fromString(r), Revision.fromString(oldestRevision)) < 0) {
+                    oldestRevision = r;
+                }
+            }
+            if (oldestRevision == null) {
+                break;
+            } else {
+                if (historyMap.remove(oldestRevision) == null) {
+                    if (historyMap.remove(Revision.fromString(oldestRevision)) == null) {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    /** Converts a previous clusterView document into a history 'string' **/
+    private static String asHistoryEntry(final ClusterViewDocument previousView, int retiringClusterNodeId, Date retireTime) {
+        String h;
+        JsopBuilder b = new JsopBuilder();
+        b.object();
+        b.key(VIEW_SEQ_NUM_KEY);
+        b.value(previousView.getViewSeqNum());
+        b.key(CREATED_KEY);
+        b.value(String.valueOf(previousView.getCreatedAt()));
+        b.key(CREATOR_KEY);
+        b.value(previousView.getCreatedBy());
+        b.key(RETIRED_KEY);
+        b.value(String.valueOf(standardDateFormat.format(retireTime)));
+        b.key(RETIRER_KEY);
+        b.value(retiringClusterNodeId);
+        b.key(ACTIVE_KEY);
+        b.value(setToCsv(previousView.getActiveIds()));
+        b.key(RECOVERING_KEY);
+        b.value(setToCsv(previousView.getRecoveringIds()));
+        b.key(INACTIVE_KEY);
+        b.value(setToCsv(previousView.getInactiveIds()));
+        b.endObject();
+        h = b.toString();
+        return h;
+    }
+
+    /**
+     * helper method to convert a set to a comma-separated string (without using
+     * toString() for safety)
+     * 
+     * @return null if set is null or empty, comma-separated string (no spaces)
+     *         otherwise
+     */
+    private static String setToCsv(Set<Integer> ids) {
+        if (ids == null || ids.size() == 0) {
+            return null;
+        }
+        StringBuffer sb = new StringBuffer();
+        for (Integer id : ids) {
+            if (sb.length() != 0) {
+                sb.append(",");
+            }
+            sb.append(id);
+        }
+        return sb.toString();
+    }
+
+    /**
+     * helper method to convert an array to a comma-separated string
+     * 
+     * @return null if array is null or empty, comman-separated string (no
+     *         spaces) otherwise
+     */
+    static String arrayToCsv(Integer[] arr) {
+        if (arr == null || arr.length == 0) {
+            return null;
+        }
+
+        StringBuffer sb = new StringBuffer();
+        for (Integer a : arr) {
+            if (sb.length() != 0) {
+                sb.append(",");
+            }
+            sb.append(a);
+        }
+        return sb.toString();
+    }
+
+    /**
+     * inverse helper method which converts a comma-separated string into an
+     * integer array
+     **/
+    static Integer[] csvToIntegerArray(String csv) {
+        if (csv == null) {
+            return null;
+        }
+        String[] split = csv.split(",");
+        Integer[] result = new Integer[split.length];
+        for (int i = 0; i < split.length; i++) {
+            result[i] = Integer.parseInt(split[i]);
+        }
+        return result;
+    }
+
+    /**
+     * internal reader of an existing clusterView document from the settings
+     * collection
+     **/
+    private static ClusterViewDocument doRead(DocumentNodeStore documentNodeStore) {
+        DocumentStore documentStore = documentNodeStore.getDocumentStore();
+        Document doc = documentStore.find(Collection.SETTINGS, "clusterView",
+                -1 /* -1; avoid caching */);
+        if (doc == null) {
+            return null;
+        } else {
+            ClusterViewDocument clusterView = new ClusterViewDocument(doc);
+            if (clusterView.isValid()) {
+                return clusterView;
+            } else {
+                logger.warn("read: clusterView document is not valid: " + doc.format());
+                return null;
+            }
+        }
+    }
+
+    /** comparison helper that compares an integer array with a set **/
+    static boolean matches(Integer[] expected, Set<Integer> actual) {
+        boolean expectedIsEmpty = expected == null || expected.length == 0;
+        boolean actualIsEmpty = actual == null || actual.size() == 0;
+        if (expectedIsEmpty && actualIsEmpty) {
+            // if both are null or empty, they match
+            return true;
+        }
+        if (expectedIsEmpty != actualIsEmpty) {
+            // if one of them is only empty, but the other not, then they don't
+            // match
+            return false;
+        }
+        if (expected.length != actual.size()) {
+            // different size
+            return false;
+        }
+        for (int i = 0; i < expected.length; i++) {
+            Integer aMemberId = expected[i];
+            if (!actual.contains(aMemberId)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @SuppressWarnings("unchecked")
+    ClusterViewDocument(Document doc) {
+        if (doc == null) {
+            throw new IllegalArgumentException("doc must not be null");
+        }
+        this.clusterViewId = (String) doc.get(CLUSTER_VIEW_ID_KEY);
+        this.viewSeqNum = (Long) doc.get(VIEW_SEQ_NUM_KEY);
+        this.createdAt = (String) doc.get(CREATED_KEY);
+        this.createdBy = (Integer) doc.get(CREATOR_KEY);
+
+        Object obj = doc.get(ACTIVE_KEY);
+        if (obj == null || !(obj instanceof String)) {
+            logger.trace("<init>: {} : {}", ACTIVE_KEY, obj);
+            this.activeIds = new Integer[0];
+        } else {
+            this.activeIds = csvToIntegerArray((String) obj);
+        }
+
+        Object obj2 = doc.get(RECOVERING_KEY);
+        if (obj2 == null || !(obj2 instanceof String)) {
+            logger.trace("<init>: {} : {}", RECOVERING_KEY, obj2);
+            this.recoveringIds = new Integer[0];
+        } else {
+            this.recoveringIds = csvToIntegerArray((String) obj2);
+        }
+
+        Object obj3 = doc.get(INACTIVE_KEY);
+        if (obj3 == null || !(obj3 instanceof String)) {
+            logger.trace("<init>: {} : {}", INACTIVE_KEY, obj3);
+            this.inactiveIds = new Integer[0];
+        } else {
+            this.inactiveIds = csvToIntegerArray((String) obj3);
+        }
+
+        Object obj4 = doc.get(CLUSTER_VIEW_HISTORY_KEY);
+        if (obj4 == null || !(obj4 instanceof Map)) {
+            logger.trace("<init> viewHistory is null");
+            this.viewHistory = null;
+        } else {
+            this.viewHistory = ((Map<Object, String>) obj4);
+        }
+    }
+
+    /** Returns the set of active ids of this cluster view **/
+    Set<Integer> getActiveIds() {
+        return new HashSet<Integer>(Arrays.asList(activeIds));
+    }
+
+    /** Returns the set of recovering ids of this cluster view **/
+    Set<Integer> getRecoveringIds() {
+        return new HashSet<Integer>(Arrays.asList(recoveringIds));
+    }
+
+    /** Returns the set of inactive ids of this cluster view **/
+    Set<Integer> getInactiveIds() {
+        return new HashSet<Integer>(Arrays.asList(inactiveIds));
+    }
+
+    /** Returns the history map **/
+    private Map<Object, String> getHistory() {
+        return viewHistory;
+    }
+
+    @Override
+    public String toString() {
+        return "a ClusterView[valid=" + isValid() + ", viewSeqNum=" + viewSeqNum + ", clusterViewId=" + clusterViewId
+                + ", activeIds=" + arrayToCsv(activeIds) + ", recoveringIds=" + arrayToCsv(recoveringIds) + ", inactiveIds="
+                + arrayToCsv(inactiveIds) + "]";
+    }
+
+    boolean isValid() {
+        return viewSeqNum >= 0 && activeIds != null && activeIds.length > 0;
+    }
+
+    /**
+     * Returns the date+time when this view was created, for debugging purpose
+     * only
+     **/
+    String getCreatedAt() {
+        return createdAt;
+    }
+
+    /**
+     * Returns the id of the instance that created this view, for debugging
+     * purpose only
+     **/
+    int getCreatedBy() {
+        return createdBy;
+    }
+
+    /** Returns the monotonically incrementing sequenece number of this view **/
+    long getViewSeqNum() {
+        return viewSeqNum;
+    }
+
+    /**
+     * Returns a UUID representing this cluster - will never change, propagates
+     * from view to view
+     **/
+    String getClusterViewId() {
+        return clusterViewId;
+    }
+
+    private boolean matches(Set<Integer> activeIds, Set<Integer> recoveringIds, Set<Integer> inactiveIds) {
+        if (!matches(this.activeIds, activeIds)) {
+            return false;
+        }
+        if (!matches(this.recoveringIds, recoveringIds)) {
+            return false;
+        }
+        if (!matches(this.inactiveIds, inactiveIds)) {
+            return false;
+        }
+        return true;
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java?rev=1697355&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java Mon Aug 24 11:25:48 2015
@@ -0,0 +1,660 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import javax.jcr.Value;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.jackrabbit.commons.SimpleValueFactory;
+import org.apache.jackrabbit.oak.api.Descriptors;
+import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The DocumentDiscoveryLiteService is taking care of providing a repository
+ * descriptor that contains the current cluster-view details.
+ * <p>
+ * The clusterView is provided via a repository descriptor (see
+ * OAK_DISCOVERYLITE_CLUSTERVIEW)
+ * <p>
+ * The cluster-view lists all instances (ever) known in the cluster in one of
+ * the following states:
+ * <ul>
+ * <li>active: the instance is currently running and has an up-to-date lease
+ * </li>
+ * <li>deactivating: the instance failed to update the lease recently thus a
+ * recovery is happening - or it has just finished and the local instance is yet
+ * to do a backgroundRead before it has finished reading the crashed/shutdown
+ * instance's last changes</li>
+ * <li>inactive: the instance is currently not running and all its changes have
+ * been seen by the local instance</li>
+ * </ul>
+ * <p>
+ * Additionally, the cluster-view is assigned a monotonically increasing
+ * sequence number to. This sequence number is persisted, thus all instances in
+ * the cluster will show the same sequence number for a particular cluster-view
+ * in time.
+ * <p>
+ * Note that the 'deactivating' state might be hiding some complexity that is
+ * deliberately not shown: for the documentNS the state 'deactivating' consists
+ * of two substates: 'recovering' as in _lastRevs are updated, and 'backlog
+ * processing' for a pending backgroundRead to get the latest head state of a
+ * crashed/shutdown instance. So when an instance is in 'deactivating' state, it
+ * is not indicated via the cluster-view whether it is recovering or has backlog
+ * to process. However, the fact that an instance has to yet do a backgroundRead
+ * to get changes is a per-instance story: other instances might already have
+ * done the backgroundRead and thus no longer have a backlog for the instance(s)
+ * that left. Even though 'deactivating' therefore is dependent on the instance
+ * you get the information from, the cluster-view must have a sequence number
+ * that uniquely identifies it in the cluster. These two constraints conflict.
+ * As a simple solution to handle this case nevertheless, the 'final' flag has
+ * been introduced: the cluster-view has this flag 'final' set to true when the
+ * view is final and nothing will be changed in this sequence number anymore. If
+ * the 'final' flag is false however it indicates that the cluster-view with
+ * this particular sequence number might still experience a change (more
+ * concretely: the deactivating instances might change). Note that there
+ * alternatives to this 'final' flag have been discussed, such as using
+ * vector-counters, but there was no obvious gain achieve using an alternative
+ * approach.
+ * <p>
+ * In other words: whenever the 'final' flag is false, the view must be
+ * interpreted as 'in flux' wrt the deactivating/inactive instances and any
+ * action that depends on stable deactivating/inactive instances must not yet be
+ * done until the 'final' flag becomes true.
+ * <p>
+ * Underneath, the DocumentDiscoveryLiteService uses the clusterNodes collection
+ * to derive the clusterView, which it stores in the settings collection.
+ * Whenever it updates the clusterView it increments the sequence number by 1.
+ * <p>
+ * While this new 'clusterView' document in the settings collection sounds like
+ * redundant data (since it is just derived from the clusterNodes), it actually
+ * is not. By persisting the clusterView it becomes the new source of truth wrt
+ * what the clusterView looks like. And no two instances in the same cluster can
+ * make different conclusions based eg on different clocks they have or based on
+ * reading the clusterNodes in a slightly different moment etc. Also, the
+ * clusterView allows to store the currently two additional attributes: the
+ * clusterViewId (which is the permanent id for this cluster similar to the
+ * slingId being a permanent id for an instance) as well as the sequence number
+ * (which allows the instances to make reference to the same clusterView, and be
+ * able to simply detect whether anything has changed)
+ * <p>
+ * Prerequisites that the clusterView mechanism is stable:
+ * <ul>
+ * <li>the machine clocks are reasonably in sync - that is, they should be off
+ * by magnitudes less than the lease updateFrequency/timeout</li>
+ * <li>the write-delays from any instance to the mongo server where the
+ * clusterNodes and settings collections are stored should be very fast - at
+ * least orders of magnitudes lower again than the lease timeout</li>
+ * <li>when this instance notices that others have kicked it out of the
+ * clusterView (which it can find out when either its clusterNodes document is
+ * set to recovering or it is not in the clusterView anymore, although it just
+ * was - ie not just because of a fresh start), then this instance must step
+ * back gracefully. The exact definition is to be applied elsewhere - but it
+ * should include: stopping to update its own lease, waiting for the view to
+ * have stabilized - waiting for recovery of its own instance by the remaining
+ * instances in the cluster to have finished - and then probably waiting for
+ * another gracePeriod until it might rejoin the cluster. In between, any commit
+ * should fail with BannedFromClusterException</li>
+ * </ul>
+ * 
+ * @see #OAK_DISCOVERYLITE_CLUSTERVIEW
+ */
+@Component(immediate = true, name = DocumentDiscoveryLiteService.COMPONENT_NAME)
+@Service(value = { DocumentDiscoveryLiteService.class, Observer.class })
+public class DocumentDiscoveryLiteService implements ClusterStateChangeListener, Observer {
+
+    static final String COMPONENT_NAME = "org.apache.jackrabbit.oak.plugins.document.DocumentDiscoveryLiteService";
+
+    /**
+     * Name of the repository descriptor via which the clusterView is published
+     * - which is the raison d'etre of the DocumentDiscoveryLiteService
+     **/
+    public static final String OAK_DISCOVERYLITE_CLUSTERVIEW = "oak.discoverylite.clusterview";
+
+    private static final Logger logger = LoggerFactory.getLogger(DocumentDiscoveryLiteService.class);
+
+    /** describes the reason why the BackgroundWorker should be woken up **/
+    private static enum WakeupReason {
+        CLUSTER_STATE_CHANGED, BACKGROUND_READ_FINISHED
+    }
+
+    /**
+     * The BackgroundWorker is taking care of regularly invoking checkView -
+     * which in turn will detect if anything changed
+     **/
+    private class BackgroundWorker implements Runnable {
+
+        final Random random = new Random();
+
+        boolean stopped = false;
+
+        private void stop() {
+            logger.trace("stop: start");
+            synchronized (BackgroundWorker.this) {
+                stopped = true;
+            }
+            logger.trace("stop: end");
+        }
+
+        @Override
+        public void run() {
+            logger.info("BackgroundWorker.run: start");
+            try {
+                doRun();
+            } finally {
+                logger.info("BackgroundWorker.run: end {finally}");
+            }
+        }
+
+        private void doRun() {
+            while (!stopped) {
+                try {
+                    logger.trace("BackgroundWorker.doRun: going to call checkView");
+                    boolean shortSleep = checkView();
+                    logger.trace("BackgroundWorker.doRun: checkView terminated with {} (=shortSleep)", shortSleep);
+                    long sleepMillis = shortSleep ? (50 + random.nextInt(450)) : 5000;
+                    logger.trace("BackgroundWorker.doRun: sleeping {}ms", sleepMillis);
+                    synchronized (BackgroundWorker.this) {
+                        if (stopped)
+                            return;
+                        BackgroundWorker.this.wait(sleepMillis);
+                        if (stopped)
+                            return;
+                    }
+                    logger.trace("BackgorundWorker.doRun: done sleeping, looping");
+                } catch (Exception e) {
+                    logger.error("doRun: got an exception: " + e, e);
+                    try {
+                        Thread.sleep(5000);
+                    } catch (Exception e2) {
+                        logger.error("doRun: got an exception while sleeping due to another exception: " + e2, e2);
+                    }
+                }
+            }
+        }
+
+    }
+
+    /** This provides the 'clusterView' repository descriptors **/
+    private class DiscoveryLiteDescriptor implements Descriptors {
+
+        final SimpleValueFactory factory = new SimpleValueFactory();
+
+        @Override
+        public String[] getKeys() {
+            return new String[] { OAK_DISCOVERYLITE_CLUSTERVIEW };
+        }
+
+        @Override
+        public boolean isStandardDescriptor(String key) {
+            if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) {
+                return false;
+            }
+            return true;
+        }
+
+        @Override
+        public boolean isSingleValueDescriptor(String key) {
+            if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) {
+                return false;
+            }
+            return true;
+        }
+
+        @Override
+        public Value getValue(String key) {
+            if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) {
+                return null;
+            }
+            return factory.createValue(getClusterViewAsDescriptorValue());
+        }
+
+        @Override
+        public Value[] getValues(String key) {
+            if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) {
+                return null;
+            }
+            return new Value[] { getValue(key) };
+        }
+
+    }
+
+    /** DocumentNodeStore's (hence local) clusterId **/
+    private int clusterNodeId = -1;
+
+    /**
+     * the DocumentNodeStore - used to get the active/inactive cluster ids from
+     **/
+    private DocumentNodeStore documentNodeStore;
+
+    /**
+     * background job that periodically verifies and updates the clusterView
+     **/
+    private BackgroundWorker backgroundWorker;
+
+    /** the ClusterViewDocument which was used in the last checkView run **/
+    private ClusterViewDocument previousClusterViewDocument;
+
+    /**
+     * the ClusterView that was valid as a result of the previous checkView run
+     **/
+    private ClusterView previousClusterView;
+
+    /**
+     * kept volatile as this is frequently read in contentChanged which is
+     * better kept unsynchronized as long as possible
+     **/
+    private volatile boolean hasInstancesWithBacklog;
+
+    /**
+     * Require a static reference to the NodeStore. Note that this implies the
+     * service is only active for documentNS
+     **/
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY, policy = ReferencePolicy.STATIC)
+    private volatile DocumentNodeStore nodeStore;
+
+    /**
+     * inactive nodes that have been so for a while, ie they have no backlog
+     * anymore, so no need to check for backlog every time
+     **/
+    private Set<Integer> longTimeInactives = new HashSet<Integer>();
+
+    /**
+     * returns the clusterView as a json value for it to be provided via the
+     * repository descriptor
+     **/
+    private String getClusterViewAsDescriptorValue() {
+        if (previousClusterView == null) {
+            return null;
+        } else {
+            return previousClusterView.asDescriptorValue();
+        }
+    }
+
+    /**
+     * On activate the DocumentDiscoveryLiteService tries to start the
+     * background job
+     */
+    @Activate
+    public void activate(ComponentContext context) {
+        logger.trace("activate: start");
+
+        // set the ClusterStateChangeListener with the DocumentNodeStore
+        this.documentNodeStore = (DocumentNodeStore) nodeStore;
+        documentNodeStore.setClusterStateChangeListener(this);
+
+        // retrieve the clusterId
+        clusterNodeId = documentNodeStore.getClusterId();
+
+        // start the background worker
+        backgroundWorker = new BackgroundWorker();
+        Thread th = new Thread(backgroundWorker, "DocumentDiscoveryLiteService-BackgroundWorker-[" + clusterNodeId + "]");
+        th.setDaemon(true);
+        th.start();
+
+        // register the Descriptors - for Oak to pass it upwards
+        if (context != null) {
+            OsgiWhiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext());
+            whiteboard.register(Descriptors.class, new DiscoveryLiteDescriptor(), Collections.emptyMap());
+        }
+        logger.trace("activate: end");
+    }
+
+    /**
+     * On deactivate the background job is stopped - if it was running at all
+     **/
+    @Deactivate
+    protected void deactivate() {
+        logger.trace("deactivate: deactivated");
+
+        if (backgroundWorker != null) {
+            backgroundWorker.stop();
+            backgroundWorker = null;
+        }
+        logger.trace("deactivate: end");
+    }
+
+    /**
+     * Checks if anything changed in the current view and updates the service
+     * fields accordingly.
+     * 
+     * @return true if anything changed or is about to be changed (eg
+     *         recovery/backlog), false if the view is stable
+     */
+    private boolean checkView() {
+        logger.trace("checkView: start");
+        List<ClusterNodeInfoDocument> allClusterNodes = ClusterNodeInfoDocument.all(documentNodeStore.getDocumentStore());
+
+        Map<Integer, ClusterNodeInfoDocument> allNodeIds = new HashMap<Integer, ClusterNodeInfoDocument>();
+        Map<Integer, ClusterNodeInfoDocument> activeNotTimedOutNodes = new HashMap<Integer, ClusterNodeInfoDocument>();
+        Map<Integer, ClusterNodeInfoDocument> activeButTimedOutNodes = new HashMap<Integer, ClusterNodeInfoDocument>();
+        Map<Integer, ClusterNodeInfoDocument> recoveringNodes = new HashMap<Integer, ClusterNodeInfoDocument>();
+        Map<Integer, ClusterNodeInfoDocument> backlogNodes = new HashMap<Integer, ClusterNodeInfoDocument>();
+        Map<Integer, ClusterNodeInfoDocument> inactiveNoBacklogNodes = new HashMap<Integer, ClusterNodeInfoDocument>();
+
+        for (Iterator<ClusterNodeInfoDocument> it = allClusterNodes.iterator(); it.hasNext();) {
+            ClusterNodeInfoDocument clusterNode = it.next();
+            allNodeIds.put(clusterNode.getClusterId(), clusterNode);
+            if (clusterNode.isBeingRecovered()) {
+                recoveringNodes.put(clusterNode.getClusterId(), clusterNode);
+            } else if (!clusterNode.isActive()) {
+                if (hasBacklog(clusterNode)) {
+                    backlogNodes.put(clusterNode.getClusterId(), clusterNode);
+                } else {
+                    inactiveNoBacklogNodes.put(clusterNode.getClusterId(), clusterNode);
+                }
+            } else if (clusterNode.getLeaseEndTime() < System.currentTimeMillis()) {
+                activeButTimedOutNodes.put(clusterNode.getClusterId(), clusterNode);
+            } else {
+                activeNotTimedOutNodes.put(clusterNode.getClusterId(), clusterNode);
+            }
+        }
+
+        // the current view should now consist of:
+        // activeNotTimedOutNodes and activeButTimedOutNodes!
+        // (reason for including the timedout: they will yet have to
+        // switch to recovering or inactive - but we DONT KNOW yet.. that's
+        // predicting the future - so so far we have to stick with
+        // including them in the view)
+        Map<Integer, ClusterNodeInfoDocument> allActives;
+        allActives = new HashMap<Integer, ClusterNodeInfoDocument>(activeNotTimedOutNodes);
+        allActives.putAll(activeButTimedOutNodes);
+
+        // terminology:
+        // 'inactivating' are nodes that are either 'recovering' or 'backlog'
+        // ones
+        // 'recovering' are nodes for which one node is doing the recover() of
+        // lastRevs
+        // 'backlog' ones are nodes that are no longer active, that have
+        // finished the
+        // recover() but for which a backgroundRead is still pending to read
+        // the latest root changes.
+
+        logger.debug(
+                "checkView: active nodes: {}, timed out nodes: {}, recovering nodes: {}, backlog nodes: {}, inactive nodes: {}, total: {}, hence view nodes: {}",
+                activeNotTimedOutNodes.size(), activeButTimedOutNodes.size(), recoveringNodes.size(), backlogNodes.size(),
+                inactiveNoBacklogNodes.size(), allNodeIds.size(), allActives.size());
+
+        ClusterViewDocument originalView = previousClusterViewDocument;
+        ClusterViewDocument newView = doCheckView(allActives.keySet(), recoveringNodes.keySet(), backlogNodes.keySet(),
+                inactiveNoBacklogNodes.keySet());
+        if (newView == null) {
+            logger.trace("checkView: end. newView: null");
+            return true;
+        }
+        boolean newHasInstancesWithBacklog = recoveringNodes.size() > 0 || backlogNodes.size() > 0;
+        boolean changed = originalView == null || (newView.getViewSeqNum() != originalView.getViewSeqNum())
+                || (newHasInstancesWithBacklog != hasInstancesWithBacklog);
+        logger.debug("checkView: viewFine: {}, changed: {}, originalView: {}, newView: {}", newView != null, changed, originalView,
+                newView);
+
+        if (longTimeInactives.addAll(inactiveNoBacklogNodes.keySet())) {
+            logger.debug("checkView: updated longTimeInactives to {} (inactiveNoBacklogNodes: {})", longTimeInactives,
+                    inactiveNoBacklogNodes);
+        }
+
+        if (changed) {
+            ClusterView v = ClusterView.fromDocument(clusterNodeId, newView, backlogNodes.keySet());
+            ClusterView previousView = previousClusterView;
+            previousClusterView = v;
+            hasInstancesWithBacklog = newHasInstancesWithBacklog;
+            logger.info("checkView: view changed from: " + previousView + ", to: " + v + ", hasInstancesWithBacklog: "
+                    + hasInstancesWithBacklog);
+            return true;
+        } else {
+            logger.debug("checkView: no changes whatsoever, still at view: " + previousClusterView);
+            return hasInstancesWithBacklog;
+        }
+    }
+
+    private Revision getLastKnownRevision(int clusterNodeId) {
+        String[] lastKnownRevisions = documentNodeStore.getMBean().getLastKnownRevisions();
+        for (int i = 0; i < lastKnownRevisions.length; i++) {
+            String aLastKnownRevisionStr = lastKnownRevisions[i];
+            String[] split = aLastKnownRevisionStr.split("=");
+            if (split.length == 2) {
+                try {
+                    Integer id = Integer.parseInt(split[0]);
+                    if (id == clusterNodeId) {
+                        final Revision lastKnownRev = Revision.fromString(split[1]);
+                        logger.trace("getLastKnownRevision: end. clusterNode: {}, lastKnownRevision: {}", clusterNodeId,
+                                lastKnownRev);
+                        return lastKnownRev;
+                    }
+                } catch (NumberFormatException nfe) {
+                    logger.warn("getLastKnownRevision: could not parse integer '" + split[0] + "': " + nfe, nfe);
+                }
+            } else {
+                logger.warn("getLastKnownRevision: cannot parse lastKnownRevision: " + aLastKnownRevisionStr);
+            }
+        }
+        logger.warn("getLastKnownRevision: no lastKnownRevision found for " + clusterNodeId);
+        return null;
+    }
+
+    private boolean hasBacklog(ClusterNodeInfoDocument clusterNode) {
+        if (logger.isTraceEnabled()) {
+            logger.trace("hasBacklog: start. clusterNodeId: {}", clusterNode.getClusterId());
+        }
+        Revision lastKnownRevision = getLastKnownRevision(clusterNode.getClusterId());
+        if (lastKnownRevision == null) {
+            logger.warn("hasBacklog: no lastKnownRevision found, hence cannot determine backlog for node "
+                    + clusterNode.getClusterId());
+            return false;
+        }
+
+        // The lastKnownRevision is what the local instance has last read/seen
+        // from another instance.
+        // This must be compared to what the other instance *actually* has
+        // written as the very last thing.
+        // Now the knowledge what the other instance has last written (after
+        // recovery) would sit
+        // in the root document - so that could in theory be used. But reading
+        // the root document
+        // would have to be done *uncached*. And that's quite a change to what
+        // the original
+        // idea was: that the root document would only be read every second, to
+        // avoid contention.
+        // So this 'what the other instance has last written' information is
+        // retrieved via
+        // a new, dedicated property in the clusterNodes collection: the
+        // 'lastWrittenRootRev'.
+        // The 'lastWrittenRootRev' is written by 'UnsavedModifications' during
+        // backgroundUpdate
+        // and retrieved here quite regularly (but it should not be a big deal,
+        // as the
+        // discovery-lite is the only one reading this field so frequently and
+        // it does not
+        // interfere with normal (jcr) nodes at all).
+        String lastWrittenRootRevStr = clusterNode.getLastWrittenRootRev();
+        if (lastWrittenRootRevStr == null) {
+            logger.warn("hasBacklog: node has lastWrittenRootRev=null");
+            return false;
+        }
+        Revision lastWrittenRootRev = Revision.fromString(lastWrittenRootRevStr);
+        if (lastWrittenRootRev == null) {
+            logger.warn("hasBacklog: node has no lastWrittenRootRev: " + clusterNode.getClusterId());
+            return false;
+        }
+
+        boolean hasBacklog = Revision.getTimestampDifference(lastKnownRevision, lastWrittenRootRev) < 0;
+        if (logger.isDebugEnabled()) {
+            logger.debug("hasBacklog: clusterNodeId: {}, lastKnownRevision: {}, lastWrittenRootRev: {}, hasBacklog: {}",
+                    clusterNode.getClusterId(), lastKnownRevision, lastWrittenRootRev, hasBacklog);
+        }
+        return hasBacklog;
+    }
+
+    private ClusterViewDocument doCheckView(final Set<Integer> activeNodes, final Set<Integer> recoveringNodes,
+            final Set<Integer> backlogNodes, final Set<Integer> inactiveNodes) {
+        logger.trace("doCheckView: start: activeNodes: {}, recoveringNodes: {}, backlogNodes: {}, inactiveNodes: {}", activeNodes,
+                recoveringNodes, backlogNodes, inactiveNodes);
+
+        Set<Integer> allInactives = new HashSet<Integer>();
+        allInactives.addAll(inactiveNodes);
+        allInactives.addAll(backlogNodes);
+
+        if (activeNodes.size() == 0) {
+            // then we have zero active nodes - that's nothing expected as that
+            // includes our own node not to be active
+            // hence handle with care - ie wait until we get an active node
+            logger.warn("doCheckView: empty active ids. activeNodes:{}, recoveringNodes:{}, inactiveNodes:{}", activeNodes,
+                    recoveringNodes, inactiveNodes);
+            return null;
+        }
+        ClusterViewDocument newViewOrNull;
+        try {
+            newViewOrNull = ClusterViewDocument.readOrUpdate(documentNodeStore, activeNodes, recoveringNodes, allInactives);
+        } catch (RuntimeException re) {
+            logger.error("doCheckView: RuntimeException: re: " + re, re);
+            return null;
+        } catch (Error er) {
+            logger.error("doCheckView: Error: er: " + er, er);
+            return null;
+        }
+        logger.trace("doChckView: readOrUpdate result: {}", newViewOrNull);
+
+        // and now for some verbose documentation and logging:
+        if (newViewOrNull == null) {
+            // then there was a concurrent update of the clusterView
+            // and we should do some quick backoff sleeping
+            logger.debug("doCheckView: newViewOrNull is null: " + newViewOrNull);
+            return null;
+        } else {
+            // otherwise we now hold the newly valid view
+            // it could be the same or different to the previous one, let's
+            // check
+            if (previousClusterViewDocument == null) {
+                // oh ok, this is the very first one
+                previousClusterViewDocument = newViewOrNull;
+                logger.debug("doCheckView: end. first ever view: {}", newViewOrNull);
+                return newViewOrNull;
+            } else if (previousClusterViewDocument.getViewSeqNum() == newViewOrNull.getViewSeqNum()) {
+                // that's the normal case: the viewId matches, nothing has
+                // changed, we've already
+                // processed the previousClusterView, so:
+                logger.debug("doCheckView: end. seqNum did not change. view: {}", newViewOrNull);
+                return newViewOrNull;
+            } else {
+                // otherwise the view has changed
+                logger.info("doCheckView: view has changed from: {} to: {} - sending event...", previousClusterViewDocument,
+                        newViewOrNull);
+                previousClusterViewDocument = newViewOrNull;
+                logger.debug("doCheckView: end. changed view: {}", newViewOrNull);
+                return newViewOrNull;
+            }
+        }
+    }
+
+    @Override
+    public void handleClusterStateChange() {
+        // handleClusterStateChange is needed to learn about any state change in
+        // the clusternodes
+        // collection asap and being able to react on it - so this will wake up
+        // the
+        // backgroundWorker which in turn will - in a separate thread - check
+        // the view
+        // and send out events accordingly
+        wakeupBackgroundWorker(WakeupReason.CLUSTER_STATE_CHANGED);
+    }
+
+    private void wakeupBackgroundWorker(WakeupReason wakeupReason) {
+        final BackgroundWorker bw = backgroundWorker;
+        if (bw != null) {
+            // get a copy of this.hasInstancesWithBacklog for just the code-part
+            // in this synchronized
+            boolean hasInstancesWithBacklog = this.hasInstancesWithBacklog;
+
+            if (wakeupReason == WakeupReason.BACKGROUND_READ_FINISHED) {
+                // then only forward the notify if' hasInstancesWithBacklog'
+                // ie, we have anything we could be waiting for - otherwise
+                // we dont need to wakeup the background thread
+                if (!hasInstancesWithBacklog) {
+                    logger.trace(
+                            "wakeupBackgroundWorker: not waking up backgroundWorker, as we do not have any instances with backlog");
+                    return;
+                }
+            }
+            logger.trace("wakeupBackgroundWorker: waking up backgroundWorker, reason: {} (hasInstancesWithBacklog: {})",
+                    wakeupReason, hasInstancesWithBacklog);
+            synchronized (bw) {
+                bw.notifyAll();
+            }
+        }
+    }
+
+    /**
+     * <p>
+     * Additionally the DocumentDiscoveryLiteService must be notified when the
+     * background-read has finished - as it could be waiting for a crashed
+     * node's recovery to finish - which it can only do by checking the
+     * lastKnownRevision of the crashed instance - and that check is best done
+     * after the background read is just finished (it could optinoally do that
+     * just purely time based as well, but going via a listener is more timely,
+     * that's why this approach has been chosen).
+     */
+    @Override
+    public void contentChanged(NodeState root, CommitInfo info) {
+        // contentChanged is only used to react as quickly as possible
+        // when we have instances that have a 'backlog' - ie when instances
+        // crashed
+        // and are being recovered - then we must wait until the recovery is
+        // finished
+        // AND until the subsequent background read actually reads that
+        // instance'
+        // last changes. To catch that moment as quickly as possible,
+        // this contentChanged is used.
+        // Now from the above it also results that this only wakes up the
+        // backgroundWorker if we have any pending 'backlogy instances'
+        // otherwise this is a no-op
+        if (info == null) {
+            // then ignore this as this is likely an external change
+            // note: it could be a compacted change, in which case we should
+            // probably still process it - but we have a 5sec fallback
+            // in the BackgroundWorker to handle that case too,
+            // so:
+            logger.trace("contentChanged: ignoring content change due to commit info being null");
+            return;
+        }
+        logger.trace("contentChanged: handling content changed by waking up worker if necessary");
+        wakeupBackgroundWorker(WakeupReason.BACKGROUND_READ_FINISHED);
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1697355&r1=1697354&r2=1697355&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Mon Aug 24 11:25:48 2015
@@ -348,6 +348,16 @@ public final class DocumentNodeStore
     private final BlobStore blobStore;
 
     /**
+     * The clusterStateChangeListener is invoked on any noticed change in the
+     * clusterNodes collection.
+     * <p>
+     * Note that there is no synchronization between setting this one and using
+     * it, but arguably that is not necessary since it will be set at startup
+     * time and then never be changed.
+     */
+    private ClusterStateChangeListener clusterStateChangeListener;
+
+    /**
      * The BlobSerializer.
      */
     private final BlobSerializer blobSerializer = new BlobSerializer() {
@@ -1637,7 +1647,8 @@ public final class DocumentNodeStore
         runBackgroundReadOperations();
     }
 
-    private void runBackgroundUpdateOperations() {
+    /** Note: made package-protected for testing purpose, would otherwise be private **/
+    void runBackgroundUpdateOperations() {
         if (isDisposed.get()) {
             return;
         }
@@ -1680,7 +1691,8 @@ public final class DocumentNodeStore
 
     //----------------------< background read operations >----------------------
 
-    private void runBackgroundReadOperations() {
+    /** Note: made package-protected for testing purpose, would otherwise be private **/
+    void runBackgroundReadOperations() {
         if (isDisposed.get()) {
             return;
         }
@@ -1724,8 +1736,11 @@ public final class DocumentNodeStore
     /**
      * Updates the state about cluster nodes in {@link #activeClusterNodes}
      * and {@link #inactiveClusterNodes}.
+     * @return true if the cluster state has changed, false if the cluster state
+     * remained unchanged
      */
-    void updateClusterState() {
+    boolean updateClusterState() {
+        boolean hasChanged = false;
         long now = clock.getTime();
         Set<Integer> inactive = Sets.newHashSet();
         for (ClusterNodeInfoDocument doc : ClusterNodeInfoDocument.all(store)) {
@@ -1733,14 +1748,15 @@ public final class DocumentNodeStore
             if (cId != this.clusterId && !doc.isActive()) {
                 inactive.add(cId);
             } else {
-                activeClusterNodes.put(cId, doc.getLeaseEndTime());
+                hasChanged |= activeClusterNodes.put(cId, doc.getLeaseEndTime())==null;
             }
         }
-        activeClusterNodes.keySet().removeAll(inactive);
-        inactiveClusterNodes.keySet().retainAll(inactive);
+        hasChanged |= activeClusterNodes.keySet().removeAll(inactive);
+        hasChanged |= inactiveClusterNodes.keySet().retainAll(inactive);
         for (Integer clusterId : inactive) {
-            inactiveClusterNodes.putIfAbsent(clusterId, now);
+            hasChanged |= inactiveClusterNodes.putIfAbsent(clusterId, now)==null;
         }
+        return hasChanged;
     }
 
     /**
@@ -2436,6 +2452,16 @@ public final class DocumentNodeStore
         return blobGC;
     }
 
+    void setClusterStateChangeListener(ClusterStateChangeListener clusterStateChangeListener) {
+        this.clusterStateChangeListener = clusterStateChangeListener;
+    }
+
+    void signalClusterStateChange() {
+        if (clusterStateChangeListener != null) {
+            clusterStateChangeListener.handleClusterStateChange();
+        }
+    }
+
     //-----------------------------< DocumentNodeStoreMBean >---------------------------------
 
     public DocumentNodeStoreMBean getMBean() {
@@ -2607,8 +2633,14 @@ public final class DocumentNodeStore
 
         @Override
         protected void execute(@Nonnull DocumentNodeStore nodeStore) {
-            if (nodeStore.renewClusterIdLease()) {
-                nodeStore.updateClusterState();
+            // first renew the clusterId lease
+            nodeStore.renewClusterIdLease();
+
+            // then, independently if the lease had to be updated or not, check
+            // the status:
+            if (nodeStore.updateClusterState()) {
+                // then inform the discovery lite listener - if it is registered
+                nodeStore.signalClusterStateChange();
             }
         }
     }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1697355&r1=1697354&r2=1697355&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java Mon Aug 24 11:25:48 2015
@@ -460,7 +460,10 @@ public class DocumentNodeStoreService {
         Dictionary<String, Object> props = new Hashtable<String, Object>();
         props.put(Constants.SERVICE_PID, DocumentNodeStore.class.getName());
         props.put(DESCRIPTION, getMetadata(ds));
-        reg = context.getBundleContext().registerService(NodeStore.class.getName(), store, props);
+        // OAK-2844: in order to allow DocumentDiscoveryLiteService to directly
+        // require a service DocumentNodeStore (instead of having to do an 'instanceof')
+        // the registration is now done for both NodeStore and DocumentNodeStore here.
+        reg = context.getBundleContext().registerService(new String[]{NodeStore.class.getName(), DocumentNodeStore.class.getName()}, store, props);
     }
 
     @Deactivate

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java?rev=1697355&r1=1697354&r2=1697355&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java Mon Aug 24 11:25:48 2015
@@ -278,8 +278,17 @@ public class LastRevRecoveryAgent {
             return recover(suspects.iterator(), clusterId);
         } finally {
             Utils.closeIfCloseable(suspects);
-            // Relinquish the lock on the recovery for the cluster on the clusterInfo
+
+            // Relinquish the lock on the recovery for the cluster on the
+            // clusterInfo
+            // TODO: in case recover throws a RuntimeException (or Error..) then
+            // the recovery might have failed, yet the instance is marked
+            // as 'recovered' (by setting the state to NONE).
+            // is this really fine here? or should we not retry - or at least
+            // log the throwable?
             missingLastRevUtil.releaseRecoveryLock(clusterId);
+
+            nodeStore.signalClusterStateChange();
         }
     }
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java?rev=1697355&r1=1697354&r2=1697355&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java Mon Aug 24 11:25:48 2015
@@ -214,6 +214,17 @@ class UnsavedModifications {
                 lastRev = null;
             }
         }
+        Revision writtenRootRev = pending.get("/");
+        if (writtenRootRev != null) {
+            int cid = writtenRootRev.getClusterId();
+            if (store.getDocumentStore().find(org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES, String.valueOf(cid)) != null) {
+                UpdateOp update = new UpdateOp(String.valueOf(cid), false);
+                update.equals(Document.ID, null, String.valueOf(cid));
+                update.set(ClusterNodeInfo.LAST_WRITTEN_ROOT_REV_KEY, writtenRootRev.toString());
+                store.getDocumentStore().findAndUpdate(org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES, update);
+            }
+        }
+
         stats.write = clock.getTime() - time;
         return stats;
     }

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java?rev=1697355&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java Mon Aug 24 11:25:48 2015
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Test helper class that is capable to simply creating ClusterView and
+ * ClusterViewDocument objs
+ **/
+class ClusterViewBuilder {
+
+    private final Set<Integer> activeIds = new HashSet<Integer>();
+    private final Set<Integer> recoveringIds = new HashSet<Integer>();
+    private final Set<Integer> backlogIds = new HashSet<Integer>();
+    private final Set<Integer> inactiveIds = new HashSet<Integer>();
+    private final long viewSeqNum;
+    private final String clusterViewId;
+    private final int myId;
+
+    ClusterViewBuilder(long viewSeqNum, String clusterViewId, int myId) {
+        this.viewSeqNum = viewSeqNum;
+        this.clusterViewId = clusterViewId;
+        this.myId = myId;
+    }
+
+    public ClusterViewBuilder active(int... instanceIds) {
+        for (int i = 0; i < instanceIds.length; i++) {
+            int anId = instanceIds[i];
+            activeIds.add(anId);
+        }
+        return this;
+    }
+
+    public ClusterViewBuilder recovering(int... instanceIds) {
+        for (int i = 0; i < instanceIds.length; i++) {
+            int anId = instanceIds[i];
+            recoveringIds.add(anId);
+        }
+        return this;
+    }
+
+    public ClusterViewBuilder backlogs(int... instanceIds) {
+        for (int i = 0; i < instanceIds.length; i++) {
+            int anId = instanceIds[i];
+            backlogIds.add(anId);
+        }
+        return this;
+    }
+
+    public ClusterViewBuilder inactive(int... instanceIds) {
+        for (int i = 0; i < instanceIds.length; i++) {
+            int anId = instanceIds[i];
+            inactiveIds.add(anId);
+        }
+        return this;
+    }
+
+    public ClusterViewDocument asDoc() {
+        /*
+         * "_id" : "clusterView", "seqNum" : NumberLong(1), "inactive" : null,
+         * "clusterViewHistory" : {
+         * 
+         * }, "deactivating" : null, "created" : "2015-06-30T08:21:29.393+0200",
+         * "clusterViewId" : "882f8926-1112-493a-81a0-f946087b2986", "active" :
+         * "1", "creator" : 1, "_modCount" : NumberLong(1)
+         */
+        Document doc = new Document();
+        doc.put(ClusterViewDocument.VIEW_SEQ_NUM_KEY, viewSeqNum);
+        doc.put(ClusterViewDocument.INACTIVE_KEY, asArrayStr(inactiveIds));
+        doc.put(ClusterViewDocument.RECOVERING_KEY, asArrayStr(recoveringIds));
+        doc.put(ClusterViewDocument.ACTIVE_KEY, asArrayStr(activeIds));
+        doc.put(ClusterViewDocument.CLUSTER_VIEW_ID_KEY, clusterViewId);
+        ClusterViewDocument clusterViewDoc = new ClusterViewDocument(doc);
+        return clusterViewDoc;
+    }
+
+    public ClusterView asView() {
+        return ClusterView.fromDocument(myId, asDoc(), backlogIds);
+    }
+
+    private String asArrayStr(Set<Integer> ids) {
+        return ClusterViewDocument.arrayToCsv(asArray(ids));
+    }
+
+    private Integer[] asArray(Set<Integer> set) {
+        if (set.size() == 0) {
+            return null;
+        } else {
+            return set.toArray(new Integer[set.size()]);
+        }
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java
------------------------------------------------------------------------------
    svn:eol-style = native