You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by st...@apache.org on 2015/08/24 13:25:49 UTC
svn commit: r1697355 [1/2] - in /jackrabbit/oak/trunk/oak-core: ./
src/main/java/org/apache/jackrabbit/oak/plugins/document/
src/test/java/org/apache/jackrabbit/oak/plugins/document/
Author: stefanegli
Date: Mon Aug 24 11:25:48 2015
New Revision: 1697355
URL: http://svn.apache.org/r1697355
Log:
OAK-2844 : Introducing a simple discovery-light service - currently only available when on documentMk - exposes a new repository descriptor oak.discoverylite.clusterview which lists all active/deactivating/inactive instances connected to the same document backend
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterStateChangeListener.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterView.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java (with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java (with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocumentTest.java (with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewTest.java (with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceTest.java (with props)
Modified:
jackrabbit/oak/trunk/oak-core/pom.xml
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
Modified: jackrabbit/oak/trunk/oak-core/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/pom.xml?rev=1697355&r1=1697354&r2=1697355&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-core/pom.xml Mon Aug 24 11:25:48 2015
@@ -317,6 +317,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>junit-addons</groupId>
+ <artifactId>junit-addons</artifactId>
+ <version>1.4</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>1.9.5</version>
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java?rev=1697355&r1=1697354&r2=1697355&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java Mon Aug 24 11:25:48 2015
@@ -67,6 +67,14 @@ public class ClusterNodeInfo {
public static final String LEASE_END_KEY = "leaseEnd";
/**
+ * The key for the root-revision of the last background write (of unsaved
+ * modifications) - that is: the last root-revision written by the instance
+ * in case of a clear shutdown or via recovery of another instance in case
+ * of a crash
+ */
+ public static final String LAST_WRITTEN_ROOT_REV_KEY = "lastWrittenRootRev";
+
+ /**
* The state of the cluster. On proper shutdown the state should be cleared.
*
* @see org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.ClusterNodeState
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java?rev=1697355&r1=1697354&r2=1697355&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java Mon Aug 24 11:25:48 2015
@@ -77,4 +77,11 @@ public class ClusterNodeInfoDocument ext
private RecoverLockState getRecoveryState(){
return RecoverLockState.fromString((String) get(ClusterNodeInfo.REV_RECOVERY_LOCK));
}
+
+ /**
+ * the root-revision of the last background write (of unsaved modifications)
+ **/
+ public String getLastWrittenRootRev() {
+ return (String) get(ClusterNodeInfo.LAST_WRITTEN_ROOT_REV_KEY);
+ }
}
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterStateChangeListener.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterStateChangeListener.java?rev=1697355&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterStateChangeListener.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterStateChangeListener.java Mon Aug 24 11:25:48 2015
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+/**
+ * DocumentNS-internal listener that gets invoked when a change in the
+ * clusterNodes collection (active/inactive/timed out/recovering) is detected.
+ */
+public interface ClusterStateChangeListener {
+
+ /**
+ * Informs the listener that DocumentNodeStore has discovered a change in
+ * the clusterNodes collection.
+ */
+ public void handleClusterStateChange();
+
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterStateChangeListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterView.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterView.java?rev=1697355&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterView.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterView.java Mon Aug 24 11:25:48 2015
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.jackrabbit.oak.commons.json.JsopBuilder;
+
+/**
+ * A ClusterView represents the state of a cluster at a particular moment in
+ * time.
+ * <p>
+ * This is a combination of what is stored in the ClusterViewDocument and the
+ * list of instances that currently have a backlog.
+ * <p>
+ * In order to be able to differentiate and clearly identify the different
+ * states an instance is in, the ClusterView uses a slightly different
+ * terminology of states that it reports:
+ * <ul>
+ * <li>Active: (same as in the ClusterViewDocument) an instance that is alive
+ * and has no recoveryLock set. Whether or not the lease has timed out is
+ * ignored. If the lease would be timed out, this would be immediately noticed
+ * by one of the instances and the affected instance would thus be recovered
+ * soon.</li>
+ * <li>Deactivating: An instance that is either recovering (which is the state
+ * reported from the ClusterViewDocument) - ie it was active until now but the
+ * lease has just timed out and one of the peer instances noticed so it does a
+ * recovery now - or it is inactive but some of its changes are still in the
+ * backlog (the latter is not tracked in the ClusterViewDocument, instead
+ * instances with a backlog are in the 'inactive' bucket there).</li>
+ * <li>Inactive: An instance that is both inactive from a
+ * clusterNodes/ClusterViewDocument point of view (ie no longer active and
+ * already recovered) and it has no backlog anymore.</li>
+ * </ul>
+ * The JSON generated by the ClusterView (which is propagated to JMX) has the
+ * following fields:
+ * <ul>
+ * <li>seq = sequence number: this is a monotonically increasing number assigned
+ * to each incarnation of the persisted clusterView (in the settings
+ * collection). It can be used to take note of the fact that a view has changed
+ * even though perhaps all activeIds are still the same (eg when the listener
+ * would have missed a few changes). It can also be used to tell with certainty
+ * that 'anything has changed' compared to the clusterView with a previous
+ * sequence number</li>
+ * <li>final = is final: this is a boolean indicating whether or not the view
+ * with a particular sequence number is final (not going to change anymore) or
+ * whether the discovery lite takes the freedom to modify the view in the future
+ * (false). So whenever 'final' is false, then the view must be treated as 'in
+ * flux' and perhaps the user should wait with doing any conclusions. That's not
+ * to say that if 'final' is false, that the information provided in
+ * active/deactivating/inactive is wrong - that's of course not the case - that
+ * info is always correct. But when 'final' is false it just means that
+ * active/deactivating/inactive for a given sequence number might change.</li>
+ * <li>id = cluster view id: this is the unique, stable identifier of the local
+ * cluster. The idea of this id is to provide both an actual identifier for the
+ * local cluster as well as a 'namespace' for the instanceIds therein. The
+ * instanceIds are all just simply integers and can of course be the same for
+ * instances in different clusters.</li>
+ * <li>me = my local instance id: this is the id of the local instance as
+ * managed by DocumentNodeStore</li>
+ * <li>active = active instance ids: this is the list of instance ids that are
+ * all currently active in the local cluster. The ids are managed by
+ * DocumentNodeStore</li>
+ * <li>deactivating = deactivating instance ids: this is the list of instance
+ * ids that are all in the process of deactivating and for which therefore some
+ * data might still be making its way to the local instance. So any changes that
+ * were done by instances that are deactivating might not yet be visible locally
+ * </li>
+ * <li>deactive = deactive instance ids: this is the list of instance ids that
+ * are not running nor do they have any data pending to become visible by the
+ * local instance</li>
+ * </ul>
+ */
+class ClusterView {
+
+ /**
+ * the json containing the complete information of the state of this
+ * ClusterView. Created at constructor time for performance reasons (json
+ * will be polled via JMX very frequently, thus must be provided fast)
+ */
+ private final String json;
+
+ /**
+ * Factory method that creates a ClusterView given a ClusterViewDocument and
+ * a list of instances that currently have a backlog.
+ * <p>
+ * The ClusterViewDocument contains instances in the following states:
+ * <ul>
+ * <li>active</li>
+ * <li>recovering</li>
+ * <li>inactive</li>
+ * </ul>
+ * The ClusterView however reports these upwards as follows:
+ * <ul>
+ * <li>active: this is 1:1 the active ones from the ClusterViewDocument</li>
+ * <li>deactivating: this includes the recovering ones from the
+ * ClusterViewDocument plus those passed to this method in the backlogIds
+ * parameter</li>
+ * <li>inactive: this is the inactive ones from the ClusterViewDocument
+ * <b>minus</li> the backlogIds passed</li>
+ * </ul>
+ *
+ * @param localInstanceId
+ * the id of the local instance (me)
+ * @param clusterViewDoc
+ * the ClusterViewDocument which contains the currently persisted
+ * cluster view
+ * @param backlogIds
+ * the ids that the local instances still has not finished a
+ * background read for and thus still have a backlog
+ * @return the ClusterView representing the provided info
+ */
+ static ClusterView fromDocument(int localInstanceId, ClusterViewDocument clusterViewDoc, Set<Integer> backlogIds) {
+ Set<Integer> activeIds = clusterViewDoc.getActiveIds();
+ Set<Integer> deactivatingIds = new HashSet<Integer>();
+ deactivatingIds.addAll(clusterViewDoc.getRecoveringIds());
+ deactivatingIds.addAll(backlogIds);
+ Set<Integer> inactiveIds = new HashSet<Integer>();
+ inactiveIds.addAll(clusterViewDoc.getInactiveIds());
+ if (!inactiveIds.removeAll(backlogIds) && backlogIds.size() > 0) {
+ // then not all backlogIds were listed is inactive - which is
+ // contrary to the expectation
+ // in which case we indeed do a paranoia exception here:
+ throw new IllegalStateException(
+ "not all backlogIds (" + backlogIds + ") are part of inactiveIds (" + clusterViewDoc.getInactiveIds() + ")");
+ }
+ return new ClusterView(clusterViewDoc.getViewSeqNum(), backlogIds.size() == 0, clusterViewDoc.getClusterViewId(),
+ localInstanceId, activeIds, deactivatingIds, inactiveIds);
+ }
+
+ ClusterView(final long viewSeqNum, final boolean viewFinal, final String clusterViewId, final int localId,
+ final Set<Integer> activeIds, final Set<Integer> deactivatingIds, final Set<Integer> inactiveIds) {
+ if (viewSeqNum < 0) {
+ throw new IllegalStateException("viewSeqNum must be zero or higher: " + viewSeqNum);
+ }
+ if (clusterViewId == null || clusterViewId.length() == 0) {
+ throw new IllegalStateException("clusterViewId must not be zero or empty: " + clusterViewId);
+ }
+ if (localId < 0) {
+ throw new IllegalStateException("localId must not be zero or higher: " + localId);
+ }
+ if (activeIds == null || activeIds.size() == 0) {
+ throw new IllegalStateException("activeIds must not be null or empty");
+ }
+ if (deactivatingIds == null) {
+ throw new IllegalStateException("deactivatingIds must not be null");
+ }
+ if (inactiveIds == null) {
+ throw new IllegalStateException("inactiveIds must not be null");
+ }
+
+ json = asJson(viewSeqNum, viewFinal, clusterViewId, localId, activeIds, deactivatingIds, inactiveIds);
+ }
+
+ /**
+ * Converts the provided parameters into the clusterview json that will be
+ * provided via JMX
+ **/
+ private String asJson(final long viewSeqNum, final boolean viewFinal, final String clusterViewId, final int localId,
+ final Set<Integer> activeIds, final Set<Integer> deactivatingIds, final Set<Integer> inactiveIds) {
+ JsopBuilder builder = new JsopBuilder();
+ builder.object();
+ builder.key("seq").value(viewSeqNum);
+ builder.key("final").value(viewFinal);
+ builder.key("id").value(clusterViewId);
+ builder.key("me").value(localId);
+ builder.key("active").array();
+ for (Iterator<Integer> it = activeIds.iterator(); it.hasNext();) {
+ Integer anInstance = it.next();
+ builder.value(anInstance);
+ }
+ builder.endArray();
+ builder.key("deactivating").array();
+ for (Iterator<Integer> it = deactivatingIds.iterator(); it.hasNext();) {
+ Integer anInstance = it.next();
+ builder.value(anInstance);
+ }
+ builder.endArray();
+ builder.key("inactive").array();
+ for (Iterator<Integer> it = inactiveIds.iterator(); it.hasNext();) {
+ Integer anInstance = it.next();
+ builder.value(anInstance);
+ }
+ builder.endArray();
+ builder.endObject();
+ return builder.toString();
+ }
+
+ /** Debugging toString() **/
+ @Override
+ public String toString() {
+ return "a ClusterView[" + json + "]";
+ }
+
+ /** This is the main getter that will be polled via JMX **/
+ String asDescriptorValue() {
+ return json;
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterView.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java?rev=1697355&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java Mon Aug 24 11:25:48 2015
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.jackrabbit.oak.commons.json.JsopBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents the document stored in the settings collection containing a
+ * 'cluster view'.
+ * <p>
+ * A 'cluster view' is the state of the membership of instances that are or have
+ * all been connected to the same oak repository. The 'cluster view' is
+ * maintained by all instances in the cluster concurrently - the faster one
+ * wins. Its information is derived from the clusterNodes collection. From there
+ * the following three states are derived and instances are grouped into these:
+ * <ul>
+ * <li>Active: an instance is active and has no recoveryLock is currently
+ * acquired. The lease timeout is ignored. When the lease times out, this is
+ * noticed by one of the instances at some point and a recovery is started, at
+ * which point the instance transitions from 'Active' to 'Recovering'.</li>
+ * <li>Recovering: an instance that was active but currently has the
+ * recoveryLock acquired by one of the instances.</li>
+ * <li>Inactive: an instance is not set to active (in which case the
+ * recoveryLock is never set)</li>
+ * </ul>
+ * <p>
+ * Note that the states managed in this ClusterViewDocument differs from the one
+ * from ClusterView - since ClusterView also manages the fact that after a
+ * recovery of a crashed instance there could be a 'backlog' of changes which it
+ * doesn't yet see until a background read is performed.
+ */
+class ClusterViewDocument {
+
+ private static final Logger logger = LoggerFactory.getLogger(ClusterViewDocument.class);
+
+ /** the id of this document is always 'clusterView' **/
+ private static final String CLUSTERVIEW_DOC_ID = "clusterView";
+
+ // keys that we store in the root document - and in the history
+ /**
+ * document key that stores the stable id of the cluster (will never change)
+ * (Note: a better term would have been just clusterId - but that one is
+ * already occupied with what should actually be called clusterNodeId or
+ * just nodeId)
+ **/
+ static final String CLUSTER_VIEW_ID_KEY = "clusterViewId";
+
+ /**
+ * document key that stores the monotonically incrementing sequence number
+ * of the cluster view. Any update will increase this by 1
+ **/
+ static final String VIEW_SEQ_NUM_KEY = "seqNum";
+
+ /**
+ * document key that stores the comma-separated list of active instance ids
+ **/
+ static final String ACTIVE_KEY = "active";
+
+ /**
+ * document key that stores the comma-separated list of inactive instance
+ * ids (they might still have a backlog, that is handled in ClusterView
+ * though, never persisted
+ */
+ static final String INACTIVE_KEY = "inactive";
+
+ /**
+ * document key that stores the comma-separated list of recovering instance
+ * ids
+ **/
+ static final String RECOVERING_KEY = "recovering";
+
+ /**
+ * document key that stores the date and time when this view was created -
+ * for debugging purpose only
+ **/
+ private static final String CREATED_KEY = "created";
+
+ /**
+ * document key that stores the id of the instance that created this view -
+ * for debugging purpose only
+ **/
+ private static final String CREATOR_KEY = "creator";
+
+ /**
+ * document key that stores the date and time when this was was retired -
+ * for debugging purpose only
+ **/
+ private static final String RETIRED_KEY = "retired";
+
+ /**
+ * document key that stores the id of the instance that retired this view -
+ * for debugging purpose only
+ **/
+ private static final String RETIRER_KEY = "retirer";
+
+ /**
+ * document key that stores a short, limited history of previous cluster
+ * views - for debugging purpose only
+ **/
+ private static final String CLUSTER_VIEW_HISTORY_KEY = "clusterViewHistory";
+
+ /** the format used when storing date+time **/
+ private static final DateFormat standardDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
+
+ /** number of elements kept in the CLUSTERVIEW_HISTORY_KEY field **/
+ private static final int HISTORY_LIMIT = 10;
+
+ /** the monotonically incrementing sequence number of this cluster view **/
+ private final long viewSeqNum;
+
+ /** the stable id of this cluster **/
+ private final String clusterViewId;
+
+ /** the ids of instances that are active at this moment **/
+ private final Integer[] activeIds;
+
+ /**
+ * the ids of instances that are recovering (lastRev-recovery) at this
+ * moment
+ **/
+ private final Integer[] recoveringIds;
+
+ /** the ids of instances that are inactive at this moment **/
+ private final Integer[] inactiveIds;
+
+ /**
+ * the short, limited history of previous cluster views, for debugging only
+ **/
+ private final Map<Object, String> viewHistory;
+
+ /** the date+time at which this view was created, for debugging only **/
+ private final String createdAt;
+
+ /** the id of the instance that created this view, for debugging only **/
+ private final Integer createdBy;
+
+ /**
+ * Main method by which the ClusterViewDocument is updated in the settings
+ * collection
+ *
+ * @return the resulting ClusterViewDocument as just updated in the settings
+ * collection - or null if another instance was updating the
+ * clusterview concurrently (in which case the caller should re-read
+ * first and possibly re-update if needed)
+ */
+ static ClusterViewDocument readOrUpdate(DocumentNodeStore documentNodeStore, Set<Integer> activeIds, Set<Integer> recoveringIds,
+ Set<Integer> inactiveIds) {
+ logger.trace("readOrUpdate: expected: activeIds: {}, recoveringIds: {}, inactiveIds: {}", activeIds, recoveringIds,
+ inactiveIds);
+ if (activeIds == null || activeIds.size() == 0) {
+ logger.info("readOrUpdate: activeIds must not be null or empty");
+ throw new IllegalStateException("activeIds must not be null or empty");
+ }
+ int localClusterId = documentNodeStore.getClusterId();
+
+ final ClusterViewDocument previousView = doRead(documentNodeStore);
+ if (previousView != null) {
+ if (previousView.matches(activeIds, recoveringIds, inactiveIds)) {
+ logger.trace("readOrUpdate: view unchanged, returning: {}", previousView);
+ return previousView;
+ }
+ }
+ logger.trace(
+ "readOrUpdate: view change detected, going to update from {} to activeIds: {}, recoveringIds: {}, inactiveIds: {}",
+ previousView, activeIds, recoveringIds, inactiveIds);
+ UpdateOp updateOp = new UpdateOp(CLUSTERVIEW_DOC_ID, true);
+ Date now = new Date();
+ updateOp.set(ACTIVE_KEY, setToCsv(activeIds));
+ updateOp.set(RECOVERING_KEY, setToCsv(recoveringIds));
+ updateOp.set(INACTIVE_KEY, setToCsv(inactiveIds));
+ updateOp.set(CREATED_KEY, standardDateFormat.format(now));
+ updateOp.set(CREATOR_KEY, localClusterId);
+ Map<Object, String> historyMap = new HashMap<Object, String>();
+ if (previousView != null) {
+ Map<Object, String> previousHistory = previousView.getHistory();
+ if (previousHistory != null) {
+ historyMap.putAll(previousHistory);
+ }
+
+ historyMap.put(Revision.newRevision(localClusterId).toString(), asHistoryEntry(previousView, localClusterId, now));
+ }
+ applyHistoryLimit(historyMap);
+ updateOp.set(CLUSTER_VIEW_HISTORY_KEY, historyMap);
+
+ final Long newViewSeqNum;
+ if (previousView == null) {
+ // we are the first ever, looks like, that the clusterview is
+ // defined
+ // so we can use viewId==1 and we make sure no other cluster node
+ // tries to create this first one simultaneously - so we use
+ // 'create'
+
+ // going via 'create' requires ID to be set again (not only in new
+ // UpdateOp(id,isNew)):
+ updateOp.set(Document.ID, CLUSTERVIEW_DOC_ID);
+ ArrayList<UpdateOp> updateOps = new ArrayList<UpdateOp>();
+ newViewSeqNum = 1L;
+ updateOp.setNew(true); // paranoia as that's already set above
+ updateOp.set(VIEW_SEQ_NUM_KEY, newViewSeqNum);
+ // first view ever => choose a new unique clusterViewId
+ String clusterViewId = UUID.randomUUID().toString();
+ updateOp.set(CLUSTER_VIEW_ID_KEY, clusterViewId);
+ updateOps.add(updateOp);
+ logger.debug("updateAndRead: trying to create the first ever clusterView - hence {}={} and {}={}", VIEW_SEQ_NUM_KEY,
+ newViewSeqNum, CLUSTER_VIEW_ID_KEY, clusterViewId);
+ if (!documentNodeStore.getDocumentStore().create(Collection.SETTINGS, updateOps)) {
+ logger.debug("updateAndRead: someone else just created the first view ever while I tried - reread that one later");
+ return null;
+ }
+ } else {
+ // there were earlier clusterViews (the normal case) - thus we
+ // use 'findAndUpdate' with the condition that
+ // the view id is still at the previousview one
+ Long previousViewSeqNum = previousView.getViewSeqNum();
+ updateOp.setNew(false); // change to false from true above
+ updateOp.equals(VIEW_SEQ_NUM_KEY, null, previousViewSeqNum);
+ newViewSeqNum = previousViewSeqNum + 1;
+ updateOp.set(VIEW_SEQ_NUM_KEY, newViewSeqNum);
+ logger.debug("updateAndRead: trying to update the clusterView to {}={} ", VIEW_SEQ_NUM_KEY, newViewSeqNum);
+ if (documentNodeStore.getDocumentStore().findAndUpdate(Collection.SETTINGS, updateOp) == null) {
+ logger.debug(
+ "updateAndRead: someone else just updated the view which I wanted to do as well - reread that one later");
+ return null;
+ }
+ }
+
+ // whatever the outcome of the above - we don't care -
+ // re-reading will in any case definitely show what has been persisted
+ // and if the re-read view contains the same id, it is what we have
+ // written
+ // - otherwise someone else came in between and we have to step back and
+ // retry
+ ClusterViewDocument readResult = doRead(documentNodeStore);
+ if (readResult == null) {
+ logger.debug("updateAndRead: got null from read - whatever the exact reason, we must retry in a moment.");
+ return null;
+ } else if (newViewSeqNum.equals(readResult.getViewSeqNum())) {
+ logger.debug("updateAndRead: matching view - no change");
+ return readResult;
+ } else {
+ logger.debug("updateAndRead: someone else in the cluster was updating right after I also succeeded - re-read in a bit");
+ return null;
+ }
+ }
+
+ /**
+ * Pruning method that makes sure the history never gets larger than
+ * HISTORY_LIMIT
+ *
+ * @param historyMap
+ * the pruning is done directly on this map
+ */
+ private static void applyHistoryLimit(Map<Object, String> historyMap) {
+ while (historyMap.size() > HISTORY_LIMIT) {
+ // remove the oldest
+ String oldestRevision = null;
+ for (Iterator<Object> it = historyMap.keySet().iterator(); it.hasNext();) {
+ Object obj = it.next();
+ // obj can be a String or a Revision
+ // in case of it being a Revision the toString() will
+ // be appropriate, hence:
+ String r = obj.toString();
+ if (oldestRevision == null) {
+ oldestRevision = r;
+ } else if (Revision.getTimestampDifference(Revision.fromString(r), Revision.fromString(oldestRevision)) < 0) {
+ oldestRevision = r;
+ }
+ }
+ if (oldestRevision == null) {
+ break;
+ } else {
+ if (historyMap.remove(oldestRevision) == null) {
+ if (historyMap.remove(Revision.fromString(oldestRevision)) == null) {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /** Converts a previous clusterView document into a history 'string' **/
+ private static String asHistoryEntry(final ClusterViewDocument previousView, int retiringClusterNodeId, Date retireTime) {
+ String h;
+ JsopBuilder b = new JsopBuilder();
+ b.object();
+ b.key(VIEW_SEQ_NUM_KEY);
+ b.value(previousView.getViewSeqNum());
+ b.key(CREATED_KEY);
+ b.value(String.valueOf(previousView.getCreatedAt()));
+ b.key(CREATOR_KEY);
+ b.value(previousView.getCreatedBy());
+ b.key(RETIRED_KEY);
+ b.value(String.valueOf(standardDateFormat.format(retireTime)));
+ b.key(RETIRER_KEY);
+ b.value(retiringClusterNodeId);
+ b.key(ACTIVE_KEY);
+ b.value(setToCsv(previousView.getActiveIds()));
+ b.key(RECOVERING_KEY);
+ b.value(setToCsv(previousView.getRecoveringIds()));
+ b.key(INACTIVE_KEY);
+ b.value(setToCsv(previousView.getInactiveIds()));
+ b.endObject();
+ h = b.toString();
+ return h;
+ }
+
+ /**
+ * helper method to convert a set to a comma-separated string (without using
+ * toString() for safety)
+ *
+ * @return null if set is null or empty, comma-separated string (no spaces)
+ * otherwise
+ */
+ private static String setToCsv(Set<Integer> ids) {
+ if (ids == null || ids.size() == 0) {
+ return null;
+ }
+ StringBuffer sb = new StringBuffer();
+ for (Integer id : ids) {
+ if (sb.length() != 0) {
+ sb.append(",");
+ }
+ sb.append(id);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * helper method to convert an array to a comma-separated string
+ *
+ * @return null if array is null or empty, comman-separated string (no
+ * spaces) otherwise
+ */
+ static String arrayToCsv(Integer[] arr) {
+ if (arr == null || arr.length == 0) {
+ return null;
+ }
+
+ StringBuffer sb = new StringBuffer();
+ for (Integer a : arr) {
+ if (sb.length() != 0) {
+ sb.append(",");
+ }
+ sb.append(a);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * inverse helper method which converts a comma-separated string into an
+ * integer array
+ **/
+ static Integer[] csvToIntegerArray(String csv) {
+ if (csv == null) {
+ return null;
+ }
+ String[] split = csv.split(",");
+ Integer[] result = new Integer[split.length];
+ for (int i = 0; i < split.length; i++) {
+ result[i] = Integer.parseInt(split[i]);
+ }
+ return result;
+ }
+
+ /**
+ * internal reader of an existing clusterView document from the settings
+ * collection
+ **/
+ private static ClusterViewDocument doRead(DocumentNodeStore documentNodeStore) {
+ DocumentStore documentStore = documentNodeStore.getDocumentStore();
+ Document doc = documentStore.find(Collection.SETTINGS, "clusterView",
+ -1 /* -1; avoid caching */);
+ if (doc == null) {
+ return null;
+ } else {
+ ClusterViewDocument clusterView = new ClusterViewDocument(doc);
+ if (clusterView.isValid()) {
+ return clusterView;
+ } else {
+ logger.warn("read: clusterView document is not valid: " + doc.format());
+ return null;
+ }
+ }
+ }
+
+ /** comparison helper that compares an integer array with a set **/
+ static boolean matches(Integer[] expected, Set<Integer> actual) {
+ boolean expectedIsEmpty = expected == null || expected.length == 0;
+ boolean actualIsEmpty = actual == null || actual.size() == 0;
+ if (expectedIsEmpty && actualIsEmpty) {
+ // if both are null or empty, they match
+ return true;
+ }
+ if (expectedIsEmpty != actualIsEmpty) {
+ // if one of them is only empty, but the other not, then they don't
+ // match
+ return false;
+ }
+ if (expected.length != actual.size()) {
+ // different size
+ return false;
+ }
+ for (int i = 0; i < expected.length; i++) {
+ Integer aMemberId = expected[i];
+ if (!actual.contains(aMemberId)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @SuppressWarnings("unchecked")
+ ClusterViewDocument(Document doc) {
+ if (doc == null) {
+ throw new IllegalArgumentException("doc must not be null");
+ }
+ this.clusterViewId = (String) doc.get(CLUSTER_VIEW_ID_KEY);
+ this.viewSeqNum = (Long) doc.get(VIEW_SEQ_NUM_KEY);
+ this.createdAt = (String) doc.get(CREATED_KEY);
+ this.createdBy = (Integer) doc.get(CREATOR_KEY);
+
+ Object obj = doc.get(ACTIVE_KEY);
+ if (obj == null || !(obj instanceof String)) {
+ logger.trace("<init>: {} : {}", ACTIVE_KEY, obj);
+ this.activeIds = new Integer[0];
+ } else {
+ this.activeIds = csvToIntegerArray((String) obj);
+ }
+
+ Object obj2 = doc.get(RECOVERING_KEY);
+ if (obj2 == null || !(obj2 instanceof String)) {
+ logger.trace("<init>: {} : {}", RECOVERING_KEY, obj2);
+ this.recoveringIds = new Integer[0];
+ } else {
+ this.recoveringIds = csvToIntegerArray((String) obj2);
+ }
+
+ Object obj3 = doc.get(INACTIVE_KEY);
+ if (obj3 == null || !(obj3 instanceof String)) {
+ logger.trace("<init>: {} : {}", INACTIVE_KEY, obj3);
+ this.inactiveIds = new Integer[0];
+ } else {
+ this.inactiveIds = csvToIntegerArray((String) obj3);
+ }
+
+ Object obj4 = doc.get(CLUSTER_VIEW_HISTORY_KEY);
+ if (obj4 == null || !(obj4 instanceof Map)) {
+ logger.trace("<init> viewHistory is null");
+ this.viewHistory = null;
+ } else {
+ this.viewHistory = ((Map<Object, String>) obj4);
+ }
+ }
+
+ /** Returns the set of active ids of this cluster view **/
+ Set<Integer> getActiveIds() {
+ return new HashSet<Integer>(Arrays.asList(activeIds));
+ }
+
+ /** Returns the set of recovering ids of this cluster view **/
+ Set<Integer> getRecoveringIds() {
+ return new HashSet<Integer>(Arrays.asList(recoveringIds));
+ }
+
+ /** Returns the set of inactive ids of this cluster view **/
+ Set<Integer> getInactiveIds() {
+ return new HashSet<Integer>(Arrays.asList(inactiveIds));
+ }
+
+ /** Returns the history map **/
+ private Map<Object, String> getHistory() {
+ return viewHistory;
+ }
+
+ @Override
+ public String toString() {
+ return "a ClusterView[valid=" + isValid() + ", viewSeqNum=" + viewSeqNum + ", clusterViewId=" + clusterViewId
+ + ", activeIds=" + arrayToCsv(activeIds) + ", recoveringIds=" + arrayToCsv(recoveringIds) + ", inactiveIds="
+ + arrayToCsv(inactiveIds) + "]";
+ }
+
+ boolean isValid() {
+ return viewSeqNum >= 0 && activeIds != null && activeIds.length > 0;
+ }
+
+ /**
+ * Returns the date+time when this view was created, for debugging purpose
+ * only
+ **/
+ String getCreatedAt() {
+ return createdAt;
+ }
+
+ /**
+ * Returns the id of the instance that created this view, for debugging
+ * purpose only
+ **/
+ int getCreatedBy() {
+ return createdBy;
+ }
+
+ /** Returns the monotonically incrementing sequenece number of this view **/
+ long getViewSeqNum() {
+ return viewSeqNum;
+ }
+
+ /**
+ * Returns a UUID representing this cluster - will never change, propagates
+ * from view to view
+ **/
+ String getClusterViewId() {
+ return clusterViewId;
+ }
+
+ private boolean matches(Set<Integer> activeIds, Set<Integer> recoveringIds, Set<Integer> inactiveIds) {
+ if (!matches(this.activeIds, activeIds)) {
+ return false;
+ }
+ if (!matches(this.recoveringIds, recoveringIds)) {
+ return false;
+ }
+ if (!matches(this.inactiveIds, inactiveIds)) {
+ return false;
+ }
+ return true;
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java?rev=1697355&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java Mon Aug 24 11:25:48 2015
@@ -0,0 +1,660 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import javax.jcr.Value;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.jackrabbit.commons.SimpleValueFactory;
+import org.apache.jackrabbit.oak.api.Descriptors;
+import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The DocumentDiscoveryLiteService is taking care of providing a repository
+ * descriptor that contains the current cluster-view details.
+ * <p>
+ * The clusterView is provided via a repository descriptor (see
+ * OAK_DISCOVERYLITE_CLUSTERVIEW)
+ * <p>
+ * The cluster-view lists all instances (ever) known in the cluster in one of
+ * the following states:
+ * <ul>
+ * <li>active: the instance is currently running and has an up-to-date lease
+ * </li>
+ * <li>deactivating: the instance failed to update the lease recently thus a
+ * recovery is happening - or it has just finished and the local instance is yet
+ * to do a backgroundRead before it has finished reading the crashed/shutdown
+ * instance's last changes</li>
+ * <li>inactive: the instance is currently not running and all its changes have
+ * been seen by the local instance</li>
+ * </ul>
+ * <p>
+ * Additionally, the cluster-view is assigned a monotonically increasing
+ * sequence number to. This sequence number is persisted, thus all instances in
+ * the cluster will show the same sequence number for a particular cluster-view
+ * in time.
+ * <p>
+ * Note that the 'deactivating' state might be hiding some complexity that is
+ * deliberately not shown: for the documentNS the state 'deactivating' consists
+ * of two substates: 'recovering' as in _lastRevs are updated, and 'backlog
+ * processing' for a pending backgroundRead to get the latest head state of a
+ * crashed/shutdown instance. So when an instance is in 'deactivating' state, it
+ * is not indicated via the cluster-view whether it is recovering or has backlog
+ * to process. However, the fact that an instance has to yet do a backgroundRead
+ * to get changes is a per-instance story: other instances might already have
+ * done the backgroundRead and thus no longer have a backlog for the instance(s)
+ * that left. Even though 'deactivating' therefore is dependent on the instance
+ * you get the information from, the cluster-view must have a sequence number
+ * that uniquely identifies it in the cluster. These two constraints conflict.
+ * As a simple solution to handle this case nevertheless, the 'final' flag has
+ * been introduced: the cluster-view has this flag 'final' set to true when the
+ * view is final and nothing will be changed in this sequence number anymore. If
+ * the 'final' flag is false however it indicates that the cluster-view with
+ * this particular sequence number might still experience a change (more
+ * concretely: the deactivating instances might change). Note that there
+ * alternatives to this 'final' flag have been discussed, such as using
+ * vector-counters, but there was no obvious gain achieve using an alternative
+ * approach.
+ * <p>
+ * In other words: whenever the 'final' flag is false, the view must be
+ * interpreted as 'in flux' wrt the deactivating/inactive instances and any
+ * action that depends on stable deactivating/inactive instances must not yet be
+ * done until the 'final' flag becomes true.
+ * <p>
+ * Underneath, the DocumentDiscoveryLiteService uses the clusterNodes collection
+ * to derive the clusterView, which it stores in the settings collection.
+ * Whenever it updates the clusterView it increments the sequence number by 1.
+ * <p>
+ * While this new 'clusterView' document in the settings collection sounds like
+ * redundant data (since it is just derived from the clusterNodes), it actually
+ * is not. By persisting the clusterView it becomes the new source of truth wrt
+ * what the clusterView looks like. And no two instances in the same cluster can
+ * make different conclusions based eg on different clocks they have or based on
+ * reading the clusterNodes in a slightly different moment etc. Also, the
+ * clusterView allows to store the currently two additional attributes: the
+ * clusterViewId (which is the permanent id for this cluster similar to the
+ * slingId being a permanent id for an instance) as well as the sequence number
+ * (which allows the instances to make reference to the same clusterView, and be
+ * able to simply detect whether anything has changed)
+ * <p>
+ * Prerequisites that the clusterView mechanism is stable:
+ * <ul>
+ * <li>the machine clocks are reasonably in sync - that is, they should be off
+ * by magnitudes less than the lease updateFrequency/timeout</li>
+ * <li>the write-delays from any instance to the mongo server where the
+ * clusterNodes and settings collections are stored should be very fast - at
+ * least orders of magnitudes lower again than the lease timeout</li>
+ * <li>when this instance notices that others have kicked it out of the
+ * clusterView (which it can find out when either its clusterNodes document is
+ * set to recovering or it is not in the clusterView anymore, although it just
+ * was - ie not just because of a fresh start), then this instance must step
+ * back gracefully. The exact definition is to be applied elsewhere - but it
+ * should include: stopping to update its own lease, waiting for the view to
+ * have stabilized - waiting for recovery of its own instance by the remaining
+ * instances in the cluster to have finished - and then probably waiting for
+ * another gracePeriod until it might rejoin the cluster. In between, any commit
+ * should fail with BannedFromClusterException</li>
+ * </ul>
+ *
+ * @see #OAK_DISCOVERYLITE_CLUSTERVIEW
+ */
+@Component(immediate = true, name = DocumentDiscoveryLiteService.COMPONENT_NAME)
+@Service(value = { DocumentDiscoveryLiteService.class, Observer.class })
+public class DocumentDiscoveryLiteService implements ClusterStateChangeListener, Observer {
+
+ static final String COMPONENT_NAME = "org.apache.jackrabbit.oak.plugins.document.DocumentDiscoveryLiteService";
+
+ /**
+ * Name of the repository descriptor via which the clusterView is published
+ * - which is the raison d'etre of the DocumentDiscoveryLiteService
+ **/
+ public static final String OAK_DISCOVERYLITE_CLUSTERVIEW = "oak.discoverylite.clusterview";
+
+ private static final Logger logger = LoggerFactory.getLogger(DocumentDiscoveryLiteService.class);
+
+ /** describes the reason why the BackgroundWorker should be woken up **/
+ private static enum WakeupReason {
+ CLUSTER_STATE_CHANGED, BACKGROUND_READ_FINISHED
+ }
+
+ /**
+ * The BackgroundWorker is taking care of regularly invoking checkView -
+ * which in turn will detect if anything changed
+ **/
+ private class BackgroundWorker implements Runnable {
+
+ final Random random = new Random();
+
+ boolean stopped = false;
+
+ private void stop() {
+ logger.trace("stop: start");
+ synchronized (BackgroundWorker.this) {
+ stopped = true;
+ }
+ logger.trace("stop: end");
+ }
+
+ @Override
+ public void run() {
+ logger.info("BackgroundWorker.run: start");
+ try {
+ doRun();
+ } finally {
+ logger.info("BackgroundWorker.run: end {finally}");
+ }
+ }
+
+ private void doRun() {
+ while (!stopped) {
+ try {
+ logger.trace("BackgroundWorker.doRun: going to call checkView");
+ boolean shortSleep = checkView();
+ logger.trace("BackgroundWorker.doRun: checkView terminated with {} (=shortSleep)", shortSleep);
+ long sleepMillis = shortSleep ? (50 + random.nextInt(450)) : 5000;
+ logger.trace("BackgroundWorker.doRun: sleeping {}ms", sleepMillis);
+ synchronized (BackgroundWorker.this) {
+ if (stopped)
+ return;
+ BackgroundWorker.this.wait(sleepMillis);
+ if (stopped)
+ return;
+ }
+ logger.trace("BackgorundWorker.doRun: done sleeping, looping");
+ } catch (Exception e) {
+ logger.error("doRun: got an exception: " + e, e);
+ try {
+ Thread.sleep(5000);
+ } catch (Exception e2) {
+ logger.error("doRun: got an exception while sleeping due to another exception: " + e2, e2);
+ }
+ }
+ }
+ }
+
+ }
+
+ /** This provides the 'clusterView' repository descriptors **/
+ private class DiscoveryLiteDescriptor implements Descriptors {
+
+ final SimpleValueFactory factory = new SimpleValueFactory();
+
+ @Override
+ public String[] getKeys() {
+ return new String[] { OAK_DISCOVERYLITE_CLUSTERVIEW };
+ }
+
+ @Override
+ public boolean isStandardDescriptor(String key) {
+ if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean isSingleValueDescriptor(String key) {
+ if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public Value getValue(String key) {
+ if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) {
+ return null;
+ }
+ return factory.createValue(getClusterViewAsDescriptorValue());
+ }
+
+ @Override
+ public Value[] getValues(String key) {
+ if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) {
+ return null;
+ }
+ return new Value[] { getValue(key) };
+ }
+
+ }
+
+ /** DocumentNodeStore's (hence local) clusterId **/
+ private int clusterNodeId = -1;
+
+ /**
+ * the DocumentNodeStore - used to get the active/inactive cluster ids from
+ **/
+ private DocumentNodeStore documentNodeStore;
+
+ /**
+ * background job that periodically verifies and updates the clusterView
+ **/
+ private BackgroundWorker backgroundWorker;
+
+ /** the ClusterViewDocument which was used in the last checkView run **/
+ private ClusterViewDocument previousClusterViewDocument;
+
+ /**
+ * the ClusterView that was valid as a result of the previous checkView run
+ **/
+ private ClusterView previousClusterView;
+
+ /**
+ * kept volatile as this is frequently read in contentChanged which is
+ * better kept unsynchronized as long as possible
+ **/
+ private volatile boolean hasInstancesWithBacklog;
+
+ /**
+ * Require a static reference to the NodeStore. Note that this implies the
+ * service is only active for documentNS
+ **/
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY, policy = ReferencePolicy.STATIC)
+ private volatile DocumentNodeStore nodeStore;
+
+ /**
+ * inactive nodes that have been so for a while, ie they have no backlog
+ * anymore, so no need to check for backlog every time
+ **/
+ private Set<Integer> longTimeInactives = new HashSet<Integer>();
+
+ /**
+ * returns the clusterView as a json value for it to be provided via the
+ * repository descriptor
+ **/
+ private String getClusterViewAsDescriptorValue() {
+ if (previousClusterView == null) {
+ return null;
+ } else {
+ return previousClusterView.asDescriptorValue();
+ }
+ }
+
+ /**
+ * On activate the DocumentDiscoveryLiteService tries to start the
+ * background job
+ */
+ @Activate
+ public void activate(ComponentContext context) {
+ logger.trace("activate: start");
+
+ // set the ClusterStateChangeListener with the DocumentNodeStore
+ this.documentNodeStore = (DocumentNodeStore) nodeStore;
+ documentNodeStore.setClusterStateChangeListener(this);
+
+ // retrieve the clusterId
+ clusterNodeId = documentNodeStore.getClusterId();
+
+ // start the background worker
+ backgroundWorker = new BackgroundWorker();
+ Thread th = new Thread(backgroundWorker, "DocumentDiscoveryLiteService-BackgroundWorker-[" + clusterNodeId + "]");
+ th.setDaemon(true);
+ th.start();
+
+ // register the Descriptors - for Oak to pass it upwards
+ if (context != null) {
+ OsgiWhiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext());
+ whiteboard.register(Descriptors.class, new DiscoveryLiteDescriptor(), Collections.emptyMap());
+ }
+ logger.trace("activate: end");
+ }
+
+ /**
+ * On deactivate the background job is stopped - if it was running at all
+ **/
+ @Deactivate
+ protected void deactivate() {
+ logger.trace("deactivate: deactivated");
+
+ if (backgroundWorker != null) {
+ backgroundWorker.stop();
+ backgroundWorker = null;
+ }
+ logger.trace("deactivate: end");
+ }
+
+ /**
+ * Checks if anything changed in the current view and updates the service
+ * fields accordingly.
+ *
+ * @return true if anything changed or is about to be changed (eg
+ * recovery/backlog), false if the view is stable
+ */
+ private boolean checkView() {
+ logger.trace("checkView: start");
+ List<ClusterNodeInfoDocument> allClusterNodes = ClusterNodeInfoDocument.all(documentNodeStore.getDocumentStore());
+
+ Map<Integer, ClusterNodeInfoDocument> allNodeIds = new HashMap<Integer, ClusterNodeInfoDocument>();
+ Map<Integer, ClusterNodeInfoDocument> activeNotTimedOutNodes = new HashMap<Integer, ClusterNodeInfoDocument>();
+ Map<Integer, ClusterNodeInfoDocument> activeButTimedOutNodes = new HashMap<Integer, ClusterNodeInfoDocument>();
+ Map<Integer, ClusterNodeInfoDocument> recoveringNodes = new HashMap<Integer, ClusterNodeInfoDocument>();
+ Map<Integer, ClusterNodeInfoDocument> backlogNodes = new HashMap<Integer, ClusterNodeInfoDocument>();
+ Map<Integer, ClusterNodeInfoDocument> inactiveNoBacklogNodes = new HashMap<Integer, ClusterNodeInfoDocument>();
+
+ for (Iterator<ClusterNodeInfoDocument> it = allClusterNodes.iterator(); it.hasNext();) {
+ ClusterNodeInfoDocument clusterNode = it.next();
+ allNodeIds.put(clusterNode.getClusterId(), clusterNode);
+ if (clusterNode.isBeingRecovered()) {
+ recoveringNodes.put(clusterNode.getClusterId(), clusterNode);
+ } else if (!clusterNode.isActive()) {
+ if (hasBacklog(clusterNode)) {
+ backlogNodes.put(clusterNode.getClusterId(), clusterNode);
+ } else {
+ inactiveNoBacklogNodes.put(clusterNode.getClusterId(), clusterNode);
+ }
+ } else if (clusterNode.getLeaseEndTime() < System.currentTimeMillis()) {
+ activeButTimedOutNodes.put(clusterNode.getClusterId(), clusterNode);
+ } else {
+ activeNotTimedOutNodes.put(clusterNode.getClusterId(), clusterNode);
+ }
+ }
+
+ // the current view should now consist of:
+ // activeNotTimedOutNodes and activeButTimedOutNodes!
+ // (reason for including the timedout: they will yet have to
+ // switch to recovering or inactive - but we DONT KNOW yet.. that's
+ // predicting the future - so so far we have to stick with
+ // including them in the view)
+ Map<Integer, ClusterNodeInfoDocument> allActives;
+ allActives = new HashMap<Integer, ClusterNodeInfoDocument>(activeNotTimedOutNodes);
+ allActives.putAll(activeButTimedOutNodes);
+
+ // terminology:
+ // 'inactivating' are nodes that are either 'recovering' or 'backlog'
+ // ones
+ // 'recovering' are nodes for which one node is doing the recover() of
+ // lastRevs
+ // 'backlog' ones are nodes that are no longer active, that have
+ // finished the
+ // recover() but for which a backgroundRead is still pending to read
+ // the latest root changes.
+
+ logger.debug(
+ "checkView: active nodes: {}, timed out nodes: {}, recovering nodes: {}, backlog nodes: {}, inactive nodes: {}, total: {}, hence view nodes: {}",
+ activeNotTimedOutNodes.size(), activeButTimedOutNodes.size(), recoveringNodes.size(), backlogNodes.size(),
+ inactiveNoBacklogNodes.size(), allNodeIds.size(), allActives.size());
+
+ ClusterViewDocument originalView = previousClusterViewDocument;
+ ClusterViewDocument newView = doCheckView(allActives.keySet(), recoveringNodes.keySet(), backlogNodes.keySet(),
+ inactiveNoBacklogNodes.keySet());
+ if (newView == null) {
+ logger.trace("checkView: end. newView: null");
+ return true;
+ }
+ boolean newHasInstancesWithBacklog = recoveringNodes.size() > 0 || backlogNodes.size() > 0;
+ boolean changed = originalView == null || (newView.getViewSeqNum() != originalView.getViewSeqNum())
+ || (newHasInstancesWithBacklog != hasInstancesWithBacklog);
+ logger.debug("checkView: viewFine: {}, changed: {}, originalView: {}, newView: {}", newView != null, changed, originalView,
+ newView);
+
+ if (longTimeInactives.addAll(inactiveNoBacklogNodes.keySet())) {
+ logger.debug("checkView: updated longTimeInactives to {} (inactiveNoBacklogNodes: {})", longTimeInactives,
+ inactiveNoBacklogNodes);
+ }
+
+ if (changed) {
+ ClusterView v = ClusterView.fromDocument(clusterNodeId, newView, backlogNodes.keySet());
+ ClusterView previousView = previousClusterView;
+ previousClusterView = v;
+ hasInstancesWithBacklog = newHasInstancesWithBacklog;
+ logger.info("checkView: view changed from: " + previousView + ", to: " + v + ", hasInstancesWithBacklog: "
+ + hasInstancesWithBacklog);
+ return true;
+ } else {
+ logger.debug("checkView: no changes whatsoever, still at view: " + previousClusterView);
+ return hasInstancesWithBacklog;
+ }
+ }
+
+ private Revision getLastKnownRevision(int clusterNodeId) {
+ String[] lastKnownRevisions = documentNodeStore.getMBean().getLastKnownRevisions();
+ for (int i = 0; i < lastKnownRevisions.length; i++) {
+ String aLastKnownRevisionStr = lastKnownRevisions[i];
+ String[] split = aLastKnownRevisionStr.split("=");
+ if (split.length == 2) {
+ try {
+ Integer id = Integer.parseInt(split[0]);
+ if (id == clusterNodeId) {
+ final Revision lastKnownRev = Revision.fromString(split[1]);
+ logger.trace("getLastKnownRevision: end. clusterNode: {}, lastKnownRevision: {}", clusterNodeId,
+ lastKnownRev);
+ return lastKnownRev;
+ }
+ } catch (NumberFormatException nfe) {
+ logger.warn("getLastKnownRevision: could not parse integer '" + split[0] + "': " + nfe, nfe);
+ }
+ } else {
+ logger.warn("getLastKnownRevision: cannot parse lastKnownRevision: " + aLastKnownRevisionStr);
+ }
+ }
+ logger.warn("getLastKnownRevision: no lastKnownRevision found for " + clusterNodeId);
+ return null;
+ }
+
+ private boolean hasBacklog(ClusterNodeInfoDocument clusterNode) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("hasBacklog: start. clusterNodeId: {}", clusterNode.getClusterId());
+ }
+ Revision lastKnownRevision = getLastKnownRevision(clusterNode.getClusterId());
+ if (lastKnownRevision == null) {
+ logger.warn("hasBacklog: no lastKnownRevision found, hence cannot determine backlog for node "
+ + clusterNode.getClusterId());
+ return false;
+ }
+
+ // The lastKnownRevision is what the local instance has last read/seen
+ // from another instance.
+ // This must be compared to what the other instance *actually* has
+ // written as the very last thing.
+ // Now the knowledge what the other instance has last written (after
+ // recovery) would sit
+ // in the root document - so that could in theory be used. But reading
+ // the root document
+ // would have to be done *uncached*. And that's quite a change to what
+ // the original
+ // idea was: that the root document would only be read every second, to
+ // avoid contention.
+ // So this 'what the other instance has last written' information is
+ // retrieved via
+ // a new, dedicated property in the clusterNodes collection: the
+ // 'lastWrittenRootRev'.
+ // The 'lastWrittenRootRev' is written by 'UnsavedModifications' during
+ // backgroundUpdate
+ // and retrieved here quite regularly (but it should not be a big deal,
+ // as the
+ // discovery-lite is the only one reading this field so frequently and
+ // it does not
+ // interfere with normal (jcr) nodes at all).
+ String lastWrittenRootRevStr = clusterNode.getLastWrittenRootRev();
+ if (lastWrittenRootRevStr == null) {
+ logger.warn("hasBacklog: node has lastWrittenRootRev=null");
+ return false;
+ }
+ Revision lastWrittenRootRev = Revision.fromString(lastWrittenRootRevStr);
+ if (lastWrittenRootRev == null) {
+ logger.warn("hasBacklog: node has no lastWrittenRootRev: " + clusterNode.getClusterId());
+ return false;
+ }
+
+ boolean hasBacklog = Revision.getTimestampDifference(lastKnownRevision, lastWrittenRootRev) < 0;
+ if (logger.isDebugEnabled()) {
+ logger.debug("hasBacklog: clusterNodeId: {}, lastKnownRevision: {}, lastWrittenRootRev: {}, hasBacklog: {}",
+ clusterNode.getClusterId(), lastKnownRevision, lastWrittenRootRev, hasBacklog);
+ }
+ return hasBacklog;
+ }
+
+ private ClusterViewDocument doCheckView(final Set<Integer> activeNodes, final Set<Integer> recoveringNodes,
+ final Set<Integer> backlogNodes, final Set<Integer> inactiveNodes) {
+ logger.trace("doCheckView: start: activeNodes: {}, recoveringNodes: {}, backlogNodes: {}, inactiveNodes: {}", activeNodes,
+ recoveringNodes, backlogNodes, inactiveNodes);
+
+ Set<Integer> allInactives = new HashSet<Integer>();
+ allInactives.addAll(inactiveNodes);
+ allInactives.addAll(backlogNodes);
+
+ if (activeNodes.size() == 0) {
+ // then we have zero active nodes - that's nothing expected as that
+ // includes our own node not to be active
+ // hence handle with care - ie wait until we get an active node
+ logger.warn("doCheckView: empty active ids. activeNodes:{}, recoveringNodes:{}, inactiveNodes:{}", activeNodes,
+ recoveringNodes, inactiveNodes);
+ return null;
+ }
+ ClusterViewDocument newViewOrNull;
+ try {
+ newViewOrNull = ClusterViewDocument.readOrUpdate(documentNodeStore, activeNodes, recoveringNodes, allInactives);
+ } catch (RuntimeException re) {
+ logger.error("doCheckView: RuntimeException: re: " + re, re);
+ return null;
+ } catch (Error er) {
+ logger.error("doCheckView: Error: er: " + er, er);
+ return null;
+ }
+ logger.trace("doChckView: readOrUpdate result: {}", newViewOrNull);
+
+ // and now for some verbose documentation and logging:
+ if (newViewOrNull == null) {
+ // then there was a concurrent update of the clusterView
+ // and we should do some quick backoff sleeping
+ logger.debug("doCheckView: newViewOrNull is null: " + newViewOrNull);
+ return null;
+ } else {
+ // otherwise we now hold the newly valid view
+ // it could be the same or different to the previous one, let's
+ // check
+ if (previousClusterViewDocument == null) {
+ // oh ok, this is the very first one
+ previousClusterViewDocument = newViewOrNull;
+ logger.debug("doCheckView: end. first ever view: {}", newViewOrNull);
+ return newViewOrNull;
+ } else if (previousClusterViewDocument.getViewSeqNum() == newViewOrNull.getViewSeqNum()) {
+ // that's the normal case: the viewId matches, nothing has
+ // changed, we've already
+ // processed the previousClusterView, so:
+ logger.debug("doCheckView: end. seqNum did not change. view: {}", newViewOrNull);
+ return newViewOrNull;
+ } else {
+ // otherwise the view has changed
+ logger.info("doCheckView: view has changed from: {} to: {} - sending event...", previousClusterViewDocument,
+ newViewOrNull);
+ previousClusterViewDocument = newViewOrNull;
+ logger.debug("doCheckView: end. changed view: {}", newViewOrNull);
+ return newViewOrNull;
+ }
+ }
+ }
+
+ @Override
+ public void handleClusterStateChange() {
+ // handleClusterStateChange is needed to learn about any state change in
+ // the clusternodes
+ // collection asap and being able to react on it - so this will wake up
+ // the
+ // backgroundWorker which in turn will - in a separate thread - check
+ // the view
+ // and send out events accordingly
+ wakeupBackgroundWorker(WakeupReason.CLUSTER_STATE_CHANGED);
+ }
+
+ private void wakeupBackgroundWorker(WakeupReason wakeupReason) {
+ final BackgroundWorker bw = backgroundWorker;
+ if (bw != null) {
+ // get a copy of this.hasInstancesWithBacklog for just the code-part
+ // in this synchronized
+ boolean hasInstancesWithBacklog = this.hasInstancesWithBacklog;
+
+ if (wakeupReason == WakeupReason.BACKGROUND_READ_FINISHED) {
+ // then only forward the notify if' hasInstancesWithBacklog'
+ // ie, we have anything we could be waiting for - otherwise
+ // we dont need to wakeup the background thread
+ if (!hasInstancesWithBacklog) {
+ logger.trace(
+ "wakeupBackgroundWorker: not waking up backgroundWorker, as we do not have any instances with backlog");
+ return;
+ }
+ }
+ logger.trace("wakeupBackgroundWorker: waking up backgroundWorker, reason: {} (hasInstancesWithBacklog: {})",
+ wakeupReason, hasInstancesWithBacklog);
+ synchronized (bw) {
+ bw.notifyAll();
+ }
+ }
+ }
+
+ /**
+ * <p>
+ * Additionally the DocumentDiscoveryLiteService must be notified when the
+ * background-read has finished - as it could be waiting for a crashed
+ * node's recovery to finish - which it can only do by checking the
+ * lastKnownRevision of the crashed instance - and that check is best done
+ * after the background read is just finished (it could optinoally do that
+ * just purely time based as well, but going via a listener is more timely,
+ * that's why this approach has been chosen).
+ */
+ @Override
+ public void contentChanged(NodeState root, CommitInfo info) {
+ // contentChanged is only used to react as quickly as possible
+ // when we have instances that have a 'backlog' - ie when instances
+ // crashed
+ // and are being recovered - then we must wait until the recovery is
+ // finished
+ // AND until the subsequent background read actually reads that
+ // instance'
+ // last changes. To catch that moment as quickly as possible,
+ // this contentChanged is used.
+ // Now from the above it also results that this only wakes up the
+ // backgroundWorker if we have any pending 'backlogy instances'
+ // otherwise this is a no-op
+ if (info == null) {
+ // then ignore this as this is likely an external change
+ // note: it could be a compacted change, in which case we should
+ // probably still process it - but we have a 5sec fallback
+ // in the BackgroundWorker to handle that case too,
+ // so:
+ logger.trace("contentChanged: ignoring content change due to commit info being null");
+ return;
+ }
+ logger.trace("contentChanged: handling content changed by waking up worker if necessary");
+ wakeupBackgroundWorker(WakeupReason.BACKGROUND_READ_FINISHED);
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1697355&r1=1697354&r2=1697355&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Mon Aug 24 11:25:48 2015
@@ -348,6 +348,16 @@ public final class DocumentNodeStore
private final BlobStore blobStore;
/**
+ * The clusterStateChangeListener is invoked on any noticed change in the
+ * clusterNodes collection.
+ * <p>
+ * Note that there is no synchronization between setting this one and using
+ * it, but arguably that is not necessary since it will be set at startup
+ * time and then never be changed.
+ */
+ private ClusterStateChangeListener clusterStateChangeListener;
+
+ /**
* The BlobSerializer.
*/
private final BlobSerializer blobSerializer = new BlobSerializer() {
@@ -1637,7 +1647,8 @@ public final class DocumentNodeStore
runBackgroundReadOperations();
}
- private void runBackgroundUpdateOperations() {
+ /** Note: made package-protected for testing purpose, would otherwise be private **/
+ void runBackgroundUpdateOperations() {
if (isDisposed.get()) {
return;
}
@@ -1680,7 +1691,8 @@ public final class DocumentNodeStore
//----------------------< background read operations >----------------------
- private void runBackgroundReadOperations() {
+ /** Note: made package-protected for testing purpose, would otherwise be private **/
+ void runBackgroundReadOperations() {
if (isDisposed.get()) {
return;
}
@@ -1724,8 +1736,11 @@ public final class DocumentNodeStore
/**
* Updates the state about cluster nodes in {@link #activeClusterNodes}
* and {@link #inactiveClusterNodes}.
+ * @return true if the cluster state has changed, false if the cluster state
+ * remained unchanged
*/
- void updateClusterState() {
+ boolean updateClusterState() {
+ boolean hasChanged = false;
long now = clock.getTime();
Set<Integer> inactive = Sets.newHashSet();
for (ClusterNodeInfoDocument doc : ClusterNodeInfoDocument.all(store)) {
@@ -1733,14 +1748,15 @@ public final class DocumentNodeStore
if (cId != this.clusterId && !doc.isActive()) {
inactive.add(cId);
} else {
- activeClusterNodes.put(cId, doc.getLeaseEndTime());
+ hasChanged |= activeClusterNodes.put(cId, doc.getLeaseEndTime())==null;
}
}
- activeClusterNodes.keySet().removeAll(inactive);
- inactiveClusterNodes.keySet().retainAll(inactive);
+ hasChanged |= activeClusterNodes.keySet().removeAll(inactive);
+ hasChanged |= inactiveClusterNodes.keySet().retainAll(inactive);
for (Integer clusterId : inactive) {
- inactiveClusterNodes.putIfAbsent(clusterId, now);
+ hasChanged |= inactiveClusterNodes.putIfAbsent(clusterId, now)==null;
}
+ return hasChanged;
}
/**
@@ -2436,6 +2452,16 @@ public final class DocumentNodeStore
return blobGC;
}
+ void setClusterStateChangeListener(ClusterStateChangeListener clusterStateChangeListener) {
+ this.clusterStateChangeListener = clusterStateChangeListener;
+ }
+
+ void signalClusterStateChange() {
+ if (clusterStateChangeListener != null) {
+ clusterStateChangeListener.handleClusterStateChange();
+ }
+ }
+
//-----------------------------< DocumentNodeStoreMBean >---------------------------------
public DocumentNodeStoreMBean getMBean() {
@@ -2607,8 +2633,14 @@ public final class DocumentNodeStore
@Override
protected void execute(@Nonnull DocumentNodeStore nodeStore) {
- if (nodeStore.renewClusterIdLease()) {
- nodeStore.updateClusterState();
+ // first renew the clusterId lease
+ nodeStore.renewClusterIdLease();
+
+ // then, independently if the lease had to be updated or not, check
+ // the status:
+ if (nodeStore.updateClusterState()) {
+ // then inform the discovery lite listener - if it is registered
+ nodeStore.signalClusterStateChange();
}
}
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1697355&r1=1697354&r2=1697355&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java Mon Aug 24 11:25:48 2015
@@ -460,7 +460,10 @@ public class DocumentNodeStoreService {
Dictionary<String, Object> props = new Hashtable<String, Object>();
props.put(Constants.SERVICE_PID, DocumentNodeStore.class.getName());
props.put(DESCRIPTION, getMetadata(ds));
- reg = context.getBundleContext().registerService(NodeStore.class.getName(), store, props);
+ // OAK-2844: in order to allow DocumentDiscoveryLiteService to directly
+ // require a service DocumentNodeStore (instead of having to do an 'instanceof')
+ // the registration is now done for both NodeStore and DocumentNodeStore here.
+ reg = context.getBundleContext().registerService(new String[]{NodeStore.class.getName(), DocumentNodeStore.class.getName()}, store, props);
}
@Deactivate
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java?rev=1697355&r1=1697354&r2=1697355&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java Mon Aug 24 11:25:48 2015
@@ -278,8 +278,17 @@ public class LastRevRecoveryAgent {
return recover(suspects.iterator(), clusterId);
} finally {
Utils.closeIfCloseable(suspects);
- // Relinquish the lock on the recovery for the cluster on the clusterInfo
+
+ // Relinquish the lock on the recovery for the cluster on the
+ // clusterInfo
+ // TODO: in case recover throws a RuntimeException (or Error..) then
+ // the recovery might have failed, yet the instance is marked
+ // as 'recovered' (by setting the state to NONE).
+ // is this really fine here? or should we not retry - or at least
+ // log the throwable?
missingLastRevUtil.releaseRecoveryLock(clusterId);
+
+ nodeStore.signalClusterStateChange();
}
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java?rev=1697355&r1=1697354&r2=1697355&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java Mon Aug 24 11:25:48 2015
@@ -214,6 +214,17 @@ class UnsavedModifications {
lastRev = null;
}
}
+ Revision writtenRootRev = pending.get("/");
+ if (writtenRootRev != null) {
+ int cid = writtenRootRev.getClusterId();
+ if (store.getDocumentStore().find(org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES, String.valueOf(cid)) != null) {
+ UpdateOp update = new UpdateOp(String.valueOf(cid), false);
+ update.equals(Document.ID, null, String.valueOf(cid));
+ update.set(ClusterNodeInfo.LAST_WRITTEN_ROOT_REV_KEY, writtenRootRev.toString());
+ store.getDocumentStore().findAndUpdate(org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES, update);
+ }
+ }
+
stats.write = clock.getTime() - time;
return stats;
}
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java?rev=1697355&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java Mon Aug 24 11:25:48 2015
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Test helper class that is capable to simply creating ClusterView and
+ * ClusterViewDocument objs
+ **/
+class ClusterViewBuilder {
+
+ private final Set<Integer> activeIds = new HashSet<Integer>();
+ private final Set<Integer> recoveringIds = new HashSet<Integer>();
+ private final Set<Integer> backlogIds = new HashSet<Integer>();
+ private final Set<Integer> inactiveIds = new HashSet<Integer>();
+ private final long viewSeqNum;
+ private final String clusterViewId;
+ private final int myId;
+
+ ClusterViewBuilder(long viewSeqNum, String clusterViewId, int myId) {
+ this.viewSeqNum = viewSeqNum;
+ this.clusterViewId = clusterViewId;
+ this.myId = myId;
+ }
+
+ public ClusterViewBuilder active(int... instanceIds) {
+ for (int i = 0; i < instanceIds.length; i++) {
+ int anId = instanceIds[i];
+ activeIds.add(anId);
+ }
+ return this;
+ }
+
+ public ClusterViewBuilder recovering(int... instanceIds) {
+ for (int i = 0; i < instanceIds.length; i++) {
+ int anId = instanceIds[i];
+ recoveringIds.add(anId);
+ }
+ return this;
+ }
+
+ public ClusterViewBuilder backlogs(int... instanceIds) {
+ for (int i = 0; i < instanceIds.length; i++) {
+ int anId = instanceIds[i];
+ backlogIds.add(anId);
+ }
+ return this;
+ }
+
+ public ClusterViewBuilder inactive(int... instanceIds) {
+ for (int i = 0; i < instanceIds.length; i++) {
+ int anId = instanceIds[i];
+ inactiveIds.add(anId);
+ }
+ return this;
+ }
+
+ public ClusterViewDocument asDoc() {
+ /*
+ * "_id" : "clusterView", "seqNum" : NumberLong(1), "inactive" : null,
+ * "clusterViewHistory" : {
+ *
+ * }, "deactivating" : null, "created" : "2015-06-30T08:21:29.393+0200",
+ * "clusterViewId" : "882f8926-1112-493a-81a0-f946087b2986", "active" :
+ * "1", "creator" : 1, "_modCount" : NumberLong(1)
+ */
+ Document doc = new Document();
+ doc.put(ClusterViewDocument.VIEW_SEQ_NUM_KEY, viewSeqNum);
+ doc.put(ClusterViewDocument.INACTIVE_KEY, asArrayStr(inactiveIds));
+ doc.put(ClusterViewDocument.RECOVERING_KEY, asArrayStr(recoveringIds));
+ doc.put(ClusterViewDocument.ACTIVE_KEY, asArrayStr(activeIds));
+ doc.put(ClusterViewDocument.CLUSTER_VIEW_ID_KEY, clusterViewId);
+ ClusterViewDocument clusterViewDoc = new ClusterViewDocument(doc);
+ return clusterViewDoc;
+ }
+
+ public ClusterView asView() {
+ return ClusterView.fromDocument(myId, asDoc(), backlogIds);
+ }
+
+ private String asArrayStr(Set<Integer> ids) {
+ return ClusterViewDocument.arrayToCsv(asArray(ids));
+ }
+
+ private Integer[] asArray(Set<Integer> set) {
+ if (set.size() == 0) {
+ return null;
+ } else {
+ return set.toArray(new Integer[set.size()]);
+ }
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java
------------------------------------------------------------------------------
svn:eol-style = native