You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2015/06/17 11:10:22 UTC
svn commit: r1685964 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/document/
test/java/org/apache/jackrabbit/oak/plugins/document/
Author: mreutegg
Date: Wed Jun 17 09:10:22 2015
New Revision: 1685964
URL: http://svn.apache.org/r1685964
Log:
OAK-2829: Comparing node states for external changes is too slow
Reformat code, reduce READ_CHUNK_SIZE to 100, tweak exception handling in background read
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1685964&r1=1685963&r2=1685964&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Wed Jun 17 09:10:22 2015
@@ -1784,12 +1784,13 @@ public final class DocumentNodeStore
externalChanges.put(r, otherSeen);
}
// collect external changes
- if (last != null) {
- // add changes for this particular clusterId to the externalSort
- try {
+ if (last != null && externalSort != null) {
+ // add changes for this particular clusterId to the externalSort
+ try {
fillExternalChanges(externalSort, last, r, store);
} catch (IOException e1) {
LOG.error("backgroundRead: Exception while reading external changes from journal: "+e1, e1);
+ externalSort = null;
}
}
}
@@ -1825,9 +1826,10 @@ public final class DocumentNodeStore
setHeadRevision(newRevision());
if (dispatchChange) {
time = clock.getTime();
- if (externalSort!=null) {
- // then there were external changes - apply them to the diff cache
- try {
+ if (externalSort != null) {
+ // then there were external changes and reading them
+ // was successful -> apply them to the diff cache
+ try {
JournalEntry.applyTo(externalSort, diffCache, oldHead, headRevision);
} catch (Exception e1) {
LOG.error("backgroundRead: Exception while processing external changes from journal: "+e1, e1);
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java?rev=1685964&r1=1685963&r2=1685964&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java Wed Jun 17 09:10:22 2015
@@ -16,7 +16,6 @@
*/
package org.apache.jackrabbit.oak.plugins.document;
-
import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator;
@@ -60,7 +59,7 @@ import static org.apache.jackrabbit.oak.
* Push changes to {@link MemoryDiffCache} instead of {@link LocalDiffCache}.
* See {@link TieredDiffCache#newEntry(Revision, Revision)}. Maybe a new
* method is needed for this purpose?
- * Done (incl junit)
+ * Done (incl junit)
* Create JournalEntry for external changes related to _lastRev recovery.
* See {@link LastRevRecoveryAgent#recover(Iterator, int, boolean)}.
* Done (incl junit)
@@ -85,10 +84,13 @@ public final class JournalEntry extends
private static final String BRANCH_COMMITS = "_bc";
- private static final int READ_CHUNK_SIZE = 1024;
-
- private static final int STRINGSORT_OVERFLOW_TO_DISK_THRESHOLD = 1024 * 1024; // switch to disk after 1MB
-
+ private static final int READ_CHUNK_SIZE = 100;
+
+ /**
+ * switch to disk after 1MB
+ */
+ private static final int STRINGSORT_OVERFLOW_TO_DISK_THRESHOLD = 1024 * 1024;
+
private final DocumentStore store;
private volatile TreeNode changes = null;
@@ -96,7 +98,7 @@ public final class JournalEntry extends
JournalEntry(DocumentStore store) {
this.store = store;
}
-
+
static StringSort newSorter() {
return new StringSort(STRINGSORT_OVERFLOW_TO_DISK_THRESHOLD, new Comparator<String>() {
@Override
@@ -105,128 +107,132 @@ public final class JournalEntry extends
}
});
}
-
+
static void applyTo(@Nonnull StringSort externalSort,
- @Nonnull DiffCache diffCache,
- @Nonnull Revision from,
- @Nonnull Revision to) throws IOException {
+ @Nonnull DiffCache diffCache,
+ @Nonnull Revision from,
+ @Nonnull Revision to) throws IOException {
LOG.debug("applyTo: starting for {} to {}", from, to);
- externalSort.sort();
- // note that it is not deduplicated yet
- LOG.debug("applyTo: sorting done.");
-
- final DiffCache.Entry entry = checkNotNull(diffCache).newEntry(from, to, false);
-
- final Iterator<String> it = externalSort.getIds();
- if (!it.hasNext()) {
- // nothing at all? that's quite unusual..
-
- // we apply this diff as one '/' to the entry then
- entry.append("/", "");
- entry.done();
- return;
- }
- String previousPath = it.next();
- TreeNode node = new TreeNode(null, "");
- node = node.getOrCreatePath(previousPath);
- int totalCnt = 0;
- int deDuplicatedCnt = 0;
- while(it.hasNext()) {
- totalCnt++;
- final String currentPath = it.next();
- if (previousPath.equals(currentPath)) {
- // de-duplication
- continue;
- }
-
- // 'node' contains one hierarchy line, eg /a, /a/b, /a/b/c, /a/b/c/d
- // including the children on each level.
- // these children have not yet been appended to the diffCache entry
- // and have to be added as soon as the 'currentPath' is not
- // part of that hierarchy anymore and we 'move elsewhere'.
- // eg if 'currentPath' is /a/b/e, then we must flush /a/b/c/d and /a/b/c
- while(node!=null && !node.isParentOf(currentPath)) {
- // add parent to the diff entry
- entry.append(node.getPath(), getChanges(node));
- deDuplicatedCnt++;
- node = node.parent;
- }
-
- if (node==null) {
- // we should never go 'passed' the root, hence node should
- // never be null - if it becomes null anyway, start with
- // a fresh root:
- node = new TreeNode(null, "");
- node = node.getOrCreatePath(currentPath);
- } else {
- // this is the normal route: we add a direct or grand-child
- // node to the current node:
- node = node.getOrCreatePath(currentPath);
- }
- previousPath = currentPath;
- }
-
- // once we're done we still have the last hierarchy line contained in 'node',
- // eg /x, /x/y, /x/y/z
- // and that one we must now append to the diffcache entry:
- while(node!=null) {
+ externalSort.sort();
+ // note that it is not de-duplicated yet
+ LOG.debug("applyTo: sorting done.");
+
+ final DiffCache.Entry entry = checkNotNull(diffCache).newEntry(from, to, false);
+
+ final Iterator<String> it = externalSort.getIds();
+ if (!it.hasNext()) {
+ // nothing at all? that's quite unusual..
+
+ // we apply this diff as one '/' to the entry then
+ entry.append("/", "");
+ entry.done();
+ return;
+ }
+ String previousPath = it.next();
+ TreeNode node = new TreeNode(null, "");
+ node = node.getOrCreatePath(previousPath);
+ int totalCnt = 0;
+ int deDuplicatedCnt = 0;
+ while (it.hasNext()) {
+ totalCnt++;
+ final String currentPath = it.next();
+ if (previousPath.equals(currentPath)) {
+ // de-duplication
+ continue;
+ }
+
+ // 'node' contains one hierarchy line, eg /a, /a/b, /a/b/c, /a/b/c/d
+ // including the children on each level.
+ // these children have not yet been appended to the diffCache entry
+ // and have to be added as soon as the 'currentPath' is not
+ // part of that hierarchy anymore and we 'move elsewhere'.
+ // eg if 'currentPath' is /a/b/e, then we must flush /a/b/c/d and /a/b/c
+ while (node != null && !node.isParentOf(currentPath)) {
+ // add parent to the diff entry
+ entry.append(node.getPath(), getChanges(node));
+ deDuplicatedCnt++;
+ node = node.parent;
+ }
+
+ if (node == null) {
+ // we should never go 'passed' the root, hence node should
+ // never be null - if it becomes null anyway, start with
+ // a fresh root:
+ node = new TreeNode(null, "");
+ node = node.getOrCreatePath(currentPath);
+ } else {
+ // this is the normal route: we add a direct or grand-child
+ // node to the current node:
+ node = node.getOrCreatePath(currentPath);
+ }
+ previousPath = currentPath;
+ }
+
+ // once we're done we still have the last hierarchy line contained in 'node',
+ // eg /x, /x/y, /x/y/z
+ // and that one we must now append to the diffcache entry:
+ while (node != null) {
entry.append(node.getPath(), getChanges(node));
- deDuplicatedCnt++;
- node = node.parent;
- }
-
- // and finally: mark the diffcache entry as 'done':
+ deDuplicatedCnt++;
+ node = node.parent;
+ }
+
+ // and finally: mark the diffcache entry as 'done':
entry.done();
LOG.debug("applyTo: done. totalCnt: {}, deDuplicatedCnt: {}", totalCnt, deDuplicatedCnt);
}
-
+
/**
- * Reads all external changes between the two given revisions (with the same clusterId)
- * from the journal and appends the paths therein to the provided sorter.
+ * Reads all external changes between the two given revisions (with the same
+ * clusterId) from the journal and appends the paths therein to the provided
+ * sorter.
*
- * @param sorter the StringSort to which all externally changed paths between
- * the provided revisions will be added
- * @param from the lower bound of the revision range (exclusive).
- * @param to the upper bound of the revision range (inclusive).
- * @param store the document store to query.
- * @throws IOException
+ * @param sorter the StringSort to which all externally changed paths
+ * between the provided revisions will be added
+ * @param from the lower bound of the revision range (exclusive).
+ * @param to the upper bound of the revision range (inclusive).
+ * @param store the document store to query.
+ * @throws IOException
*/
static void fillExternalChanges(@Nonnull StringSort sorter,
@Nonnull Revision from,
@Nonnull Revision to,
- @Nonnull DocumentStore store) throws IOException {
+ @Nonnull DocumentStore store)
+ throws IOException {
checkArgument(checkNotNull(from).getClusterId() == checkNotNull(to).getClusterId());
-
+
// to is inclusive, but DocumentStore.query() toKey is exclusive
final String inclusiveToId = asId(to);
to = new Revision(to.getTimestamp(), to.getCounter() + 1,
to.getClusterId(), to.isBranch());
- // read in chunks to support very large sets of changes between subsequent background reads
- // to do this, provide a (TODO eventually configurable) limit for the number of entries to be returned per query
- // if the number of elements returned by the query is exactly the provided limit, then
- // loop and do subsequent queries
+ // read in chunks to support very large sets of changes between
+ // subsequent background reads to do this, provide a (TODO eventually configurable)
+ // limit for the number of entries to be returned per query if the
+ // number of elements returned by the query is exactly the provided
+ // limit, then loop and do subsequent queries
final String toId = asId(to);
String fromId = asId(from);
- while(true) {
- if (fromId.equals(inclusiveToId)) {
- // avoid query if from and to are off by just 1 counter (which we do due to exclusiveness of query borders)
- // as in this case the query will always be empty anyway - so avoid doing the query in the first place
- break;
- }
- List<JournalEntry> partialResult = store.query(JOURNAL, fromId, toId, READ_CHUNK_SIZE);
- if (partialResult==null) {
- break;
- }
- for(JournalEntry d: partialResult) {
- d.addTo(sorter);
- }
- if (partialResult.size()<READ_CHUNK_SIZE) {
- break;
- }
- // otherwise set 'fromId' to the last entry just processed
- // that works fine as the query is non-inclusive (ie does not include the from which we'd otherwise double-process)
- fromId = partialResult.get(partialResult.size()-1).getId();
+ while (true) {
+ if (fromId.equals(inclusiveToId)) {
+ // avoid query if from and to are off by just 1 counter (which
+ // we do due to exclusiveness of query borders) as in this case
+ // the query will always be empty anyway - so avoid doing the
+ // query in the first place
+ break;
+ }
+ List<JournalEntry> partialResult = store.query(JOURNAL, fromId, toId, READ_CHUNK_SIZE);
+
+ for (JournalEntry d : partialResult) {
+ d.addTo(sorter);
+ }
+ if (partialResult.size() < READ_CHUNK_SIZE) {
+ break;
+ }
+ // otherwise set 'fromId' to the last entry just processed
+ // that works fine as the query is non-inclusive (ie does not
+ // include the from which we'd otherwise double-process)
+ fromId = partialResult.get(partialResult.size() - 1).getId();
}
}
@@ -281,17 +287,17 @@ public final class JournalEntry extends
}
return op;
}
-
+
void addTo(final StringSort sort) throws IOException {
TreeNode n = getChanges();
TraversingVisitor v = new TraversingVisitor() {
-
+
@Override
public void node(TreeNode node, String path) throws IOException {
sort.add(path);
}
};
- n.accept(v, "/");
+ n.accept(v, "/");
for (JournalEntry e : getBranchCommits()) {
e.getChanges().accept(v, "/");
}
@@ -372,21 +378,21 @@ public final class JournalEntry extends
private final String path;
private final TreeNode parent;
-
+
TreeNode(TreeNode parent, String name) {
if (name.contains("/")) {
- throw new IllegalArgumentException("name must not contain /: "+name);
+ throw new IllegalArgumentException("name must not contain /: " + name);
}
this.parent = parent;
- if (parent==null) {
+ if (parent == null) {
this.path = "/";
- } else if (parent.parent==null) {
+ } else if (parent.parent == null) {
this.path = "/" + name;
} else {
this.path = parent.path + "/" + name;
}
}
-
+
public TreeNode getOrCreatePath(String path) {
if (path.equals(this.path)) {
// then path denotes the same as myself, hence return myself
@@ -394,14 +400,14 @@ public final class JournalEntry extends
}
if (!path.startsWith(this.path)) {
// this must never happen
- throw new IllegalStateException("path not child of myself. path: "+path+", myself: "+this.path);
+ throw new IllegalStateException("path not child of myself. path: " + path + ", myself: " + this.path);
}
- String sub = this.path.equals("/") ? path.substring(1) : path.substring(this.path.length()+1);
+ String sub = this.path.equals("/") ? path.substring(1) : path.substring(this.path.length() + 1);
String[] parts = sub.split("/");
TreeNode n = this;
- for (int i = 0; i < parts.length; i++) {
- if (parts[i]!=null && parts[i].length()>0) {
- n = n.getOrCreate(parts[i]);
+ for (String part : parts) {
+ if (part != null && part.length() > 0) {
+ n = n.getOrCreate(part);
}
}
return n;
@@ -412,12 +418,12 @@ public final class JournalEntry extends
// root is parent of everything
return true;
}
- if (!path.startsWith(this.path+"/")) {
+ if (!path.startsWith(this.path + "/")) {
// then I'm not parent of that path
return false;
}
- final String sub = path.substring(this.path.length()+1);
- if (sub.indexOf("/", 1)!=-1) {
+ final String sub = path.substring(this.path.length() + 1);
+ if (sub.indexOf("/", 1) != -1) {
// if the 'sub' part contains a / then
// it is not a direct child of myself,
// so I'm a grand-parent but not a direct-parent
@@ -476,6 +482,7 @@ public final class JournalEntry extends
}
}
+ @Nonnull
private TreeNode getOrCreate(String name) {
TreeNode c = children.get(name);
if (c == null) {
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java?rev=1685964&r1=1685963&r2=1685964&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java Wed Jun 17 09:10:22 2015
@@ -16,11 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.jackrabbit.oak.plugins.document;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -30,19 +28,19 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Stopwatch;
/**
- * The JournalGarbageCollector can clean up JournalEntries that are
- * older than a particular age.
- * <p>
+ * The JournalGarbageCollector can clean up JournalEntries that are older than a
+ * particular age.
+ * <p/>
* It would typically be invoked in conjunction with the VersionGarbageCollector
* but must not be confused with that one - 'journal' refers to the separate
- * collection that contains changed paths per background writes used for
+ * collection that contains changed paths per background writes used for
* observation.
*/
public class JournalGarbageCollector {
- //copied from VersionGarbageCollector:
+ //copied from VersionGarbageCollector:
private static final int DELETE_BATCH_SIZE = 450;
-
+
private final DocumentStore ds;
private static final Logger log = LoggerFactory.getLogger(JournalGarbageCollector.class);
@@ -52,9 +50,11 @@ public class JournalGarbageCollector {
}
/**
- * Deletes entries in the journal that are older than the given maxRevisionAge.
+ * Deletes entries in the journal that are older than the given
+ * maxRevisionAge.
+ *
* @param maxRevisionAge entries older than this age will be removed
- * @param unit the timeunit for maxRevisionAge
+ * @param unit the timeunit for maxRevisionAge
* @return the number of entries that have been removed
*/
public int gc(long maxRevisionAge, TimeUnit unit) {
@@ -63,7 +63,7 @@ public class JournalGarbageCollector {
log.debug("gc: Journal garbage collection starts with maxAge: {} min.", TimeUnit.MILLISECONDS.toMinutes(maxRevisionAgeInMillis));
}
Stopwatch sw = Stopwatch.createStarted();
-
+
// the journal has ids of the following format:
// 1-0000014db9aaf710-00000001
// whereas the first number is the cluster node id.
@@ -81,64 +81,61 @@ public class JournalGarbageCollector {
// if it's run on multiple concurrently, then they
// will compete at deletion, which is not optimal
// due to performance, but does not harm.
-
+
// 1. get the list of cluster node ids
final List<ClusterNodeInfoDocument> clusterNodeInfos = ClusterNodeInfoDocument.all(ds);
int numDeleted = 0;
- for (Iterator<ClusterNodeInfoDocument> it = clusterNodeInfos.iterator(); it
- .hasNext();) {
- // current algorithm is to simply look at all cluster nodes
- // irrespective of whether they are active or inactive etc.
- // this could be optimized for inactive ones: at some point, all
- // journal entries of inactive ones would have been cleaned up
- // and at that point we could stop including those long-time-inactive ones.
- // that 'long time' aspect would have to be tracked though, to be sure
- // we don't leave garbage.
- // so simpler is to quickly do a query even for long-time inactive ones
- final ClusterNodeInfoDocument clusterNodeInfoDocument = it.next();
- final int clusterNodeId = clusterNodeInfoDocument.getClusterId();
-
- // 2. iterate over that list and do a query with
- // a limit of 'batch size'
- boolean branch = false;
- long startPointer = 0;
- while(true) {
- String fromKey = JournalEntry.asId(new Revision(startPointer, 0, clusterNodeId, branch));
- String toKey = JournalEntry.asId(new Revision(
- System.currentTimeMillis() - maxRevisionAgeInMillis, Integer.MAX_VALUE, clusterNodeId, branch));
- int limit = DELETE_BATCH_SIZE;
- List<JournalEntry> deletionBatch = ds.query(Collection.JOURNAL, fromKey, toKey, limit);
- if (deletionBatch.size()>0) {
- ds.remove(Collection.JOURNAL, asKeys(deletionBatch));
- numDeleted+=deletionBatch.size();
- }
- if (deletionBatch.size()<limit) {
- if (!branch) {
- // do the same for branches:
- // this will start at the beginning again with branch set to true
- // and eventually finish too
- startPointer = 0;
- branch = true;
- continue;
- }
- break;
- }
- startPointer = deletionBatch.get(deletionBatch.size()-1).getRevisionTimestamp();
- }
- }
-
+ for (ClusterNodeInfoDocument clusterNodeInfoDocument : clusterNodeInfos) {
+ // current algorithm is to simply look at all cluster nodes
+ // irrespective of whether they are active or inactive etc.
+ // this could be optimized for inactive ones: at some point, all
+ // journal entries of inactive ones would have been cleaned up
+ // and at that point we could stop including those long-time-inactive ones.
+ // that 'long time' aspect would have to be tracked though, to be sure
+ // we don't leave garbage.
+ // so simpler is to quickly do a query even for long-time inactive ones
+ final int clusterNodeId = clusterNodeInfoDocument.getClusterId();
+
+ // 2. iterate over that list and do a query with
+ // a limit of 'batch size'
+ boolean branch = false;
+ long startPointer = 0;
+ while (true) {
+ String fromKey = JournalEntry.asId(new Revision(startPointer, 0, clusterNodeId, branch));
+ String toKey = JournalEntry.asId(new Revision(System.currentTimeMillis() - maxRevisionAgeInMillis, Integer.MAX_VALUE, clusterNodeId, branch));
+ int limit = DELETE_BATCH_SIZE;
+ List<JournalEntry> deletionBatch = ds.query(Collection.JOURNAL, fromKey, toKey, limit);
+ if (deletionBatch.size() > 0) {
+ ds.remove(Collection.JOURNAL, asKeys(deletionBatch));
+ numDeleted += deletionBatch.size();
+ }
+ if (deletionBatch.size() < limit) {
+ if (!branch) {
+ // do the same for branches:
+ // this will start at the beginning again with branch set to true
+ // and eventually finish too
+ startPointer = 0;
+ branch = true;
+ continue;
+ }
+ break;
+ }
+ startPointer = deletionBatch.get(deletionBatch.size() - 1).getRevisionTimestamp();
+ }
+ }
+
sw.stop();
-
+
log.info("gc: Journal garbage collection took {}, deleted {} entries that were older than {} min.", sw, numDeleted, TimeUnit.MILLISECONDS.toMinutes(maxRevisionAgeInMillis));
return numDeleted;
}
- private List<String> asKeys(List<JournalEntry> deletionBatch) {
- final List<String> keys = new ArrayList<String>(deletionBatch.size());
- for (JournalEntry e: deletionBatch) {
- keys.add(e.getId());
- }
- return keys;
- }
+ private List<String> asKeys(List<JournalEntry> deletionBatch) {
+ final List<String> keys = new ArrayList<String>(deletionBatch.size());
+ for (JournalEntry e : deletionBatch) {
+ keys.add(e.getId());
+ }
+ return keys;
+ }
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java?rev=1685964&r1=1685963&r2=1685964&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java Wed Jun 17 09:10:22 2015
@@ -140,7 +140,7 @@ public class LastRevRecoveryAgent {
//Map of known last rev of checked paths
Map<String, Revision> knownLastRevs = MapFactory.getInstance().create();
- final DocumentStore docStore = nodeStore.getDocumentStore();
+ final DocumentStore docStore = nodeStore.getDocumentStore();
final JournalEntry changes = JOURNAL.newDocument(docStore);
long count = 0;
@@ -209,11 +209,11 @@ public class LastRevRecoveryAgent {
//UnsavedModifications is designed to be used in concurrent
//access mode. For recovery case there is no concurrent access
//involve so just pass a new lock instance
-
- // the lock uses to do the persisting is a plain reentrant lock
- // thus it doesn't matter, where exactly the check is done
- // as to whether the recovered lastRev has already been
- // written to the journal.
+
+ // the lock uses to do the persisting is a plain reentrant lock
+ // thus it doesn't matter, where exactly the check is done
+ // as to whether the recovered lastRev has already been
+ // written to the journal.
unsaved.persist(nodeStore, new UnsavedModifications.Snapshot() {
@Override
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java?rev=1685964&r1=1685963&r2=1685964&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java Wed Jun 17 09:10:22 2015
@@ -61,11 +61,11 @@ class TieredDiffCache implements DiffCac
@Nonnull
@Override
public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to, boolean local) {
- if (local) {
- return localCache.newEntry(from, to, true);
- } else {
- return memoryCache.newEntry(from, to, false);
- }
+ if (local) {
+ return localCache.newEntry(from, to, true);
+ } else {
+ return memoryCache.newEntry(from, to, false);
+ }
}
@Nonnull
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java?rev=1685964&r1=1685963&r2=1685964&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java Wed Jun 17 09:10:22 2015
@@ -20,186 +20,198 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nonnull;
+
import org.apache.jackrabbit.oak.cache.CacheStats;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
public class CountingDocumentStore implements DocumentStore {
-
- private DocumentStore delegate;
-
- //TODO: remove mec
- boolean printStacks;
-
- class Stats {
-
- private int numFindCalls;
- private int numQueryCalls;
- private int numRemoveCalls;
- private int numCreateOrUpdateCalls;
-
- }
-
- private Map<Collection, Stats> collectionStats = new HashMap<Collection, Stats>();
-
- public CountingDocumentStore(DocumentStore delegate) {
- this.delegate = delegate;
- }
-
- public void resetCounters() {
- collectionStats.clear();
- }
-
- public int getNumFindCalls(Collection collection) {
- return getStats(collection).numFindCalls;
- }
-
- public int getNumQueryCalls(Collection collection) {
- return getStats(collection).numQueryCalls;
- }
-
- public int getNumRemoveCalls(Collection collection) {
- return getStats(collection).numRemoveCalls;
- }
-
- public int getNumCreateOrUpdateCalls(Collection collection) {
- return getStats(collection).numCreateOrUpdateCalls;
- }
-
- private Stats getStats(Collection collection) {
- if (!collectionStats.containsKey(collection)) {
- Stats s = new Stats();
- collectionStats.put(collection, s);
- return s;
- } else {
- return collectionStats.get(collection);
- }
- }
-
- @Override
- public <T extends Document> T find(Collection<T> collection, String key) {
- getStats(collection).numFindCalls++;
- if (printStacks) {
- new Exception("find ["+getStats(collection).numFindCalls+"] ("+collection+") "+key).printStackTrace();
- }
- return delegate.find(collection, key);
- }
-
- @Override
- public <T extends Document> T find(Collection<T> collection, String key,
- int maxCacheAge) {
- getStats(collection).numFindCalls++;
- if (printStacks) {
- new Exception("find ["+getStats(collection).numFindCalls+"] ("+collection+") "+key+" [max: "+maxCacheAge+"]").printStackTrace();
- }
- return delegate.find(collection, key, maxCacheAge);
- }
-
- @Override
- public <T extends Document> List<T> query(Collection<T> collection,
- String fromKey, String toKey, int limit) {
- getStats(collection).numQueryCalls++;
- if (printStacks) {
- new Exception("query1 ["+getStats(collection).numQueryCalls+"] ("+collection+") "+fromKey+", to "+toKey+". limit "+limit).printStackTrace();
- }
- return delegate.query(collection, fromKey, toKey, limit);
- }
-
- @Override
- public <T extends Document> List<T> query(Collection<T> collection,
- String fromKey, String toKey, String indexedProperty,
- long startValue, int limit) {
- getStats(collection).numQueryCalls++;
- if (printStacks) {
- new Exception("query2 ["+getStats(collection).numQueryCalls+"] ("+collection+") "+fromKey+", to "+toKey+". limit "+limit).printStackTrace();
- }
- return delegate.query(collection, fromKey, toKey, indexedProperty, startValue, limit);
- }
-
- @Override
- public <T extends Document> void remove(Collection<T> collection, String key) {
- getStats(collection).numRemoveCalls++;
- delegate.remove(collection, key);
- }
-
- @Override
- public <T extends Document> void remove(Collection<T> collection,
- List<String> keys) {
- getStats(collection).numRemoveCalls++;
- delegate.remove(collection, keys);
- }
-
- @Override
- public <T extends Document> int remove(Collection<T> collection,
- Map<String, Map<Key, Condition>> toRemove) {
- getStats(collection).numRemoveCalls++;
- return delegate.remove(collection, toRemove);
- }
-
- @Override
- public <T extends Document> boolean create(Collection<T> collection,
- List<UpdateOp> updateOps) {
- getStats(collection).numCreateOrUpdateCalls++;
- return delegate.create(collection, updateOps);
- }
-
- @Override
- public <T extends Document> void update(Collection<T> collection,
- List<String> keys, UpdateOp updateOp) {
- getStats(collection).numCreateOrUpdateCalls++;
- delegate.update(collection, keys, updateOp);
- }
-
- @Override
- public <T extends Document> T createOrUpdate(Collection<T> collection,
- UpdateOp update) {
- getStats(collection).numCreateOrUpdateCalls++;
- return delegate.createOrUpdate(collection, update);
- }
-
- @Override
- public <T extends Document> T findAndUpdate(Collection<T> collection,
- UpdateOp update) {
- getStats(collection).numCreateOrUpdateCalls++;
- return delegate.findAndUpdate(collection, update);
- }
-
- @Override
- public CacheInvalidationStats invalidateCache() {
- return delegate.invalidateCache();
- }
-
- @Override
- public <T extends Document> void invalidateCache(Collection<T> collection,
- String key) {
- delegate.invalidateCache(collection, key);
- }
-
- @Override
- public void dispose() {
- delegate.dispose();
- }
-
- @Override
- public <T extends Document> T getIfCached(Collection<T> collection,
- String key) {
- return delegate.getIfCached(collection, key);
- }
-
- @Override
- public void setReadWriteMode(String readWriteMode) {
- delegate.setReadWriteMode(readWriteMode);
- }
-
- @Override
- public CacheStats getCacheStats() {
- return delegate.getCacheStats();
- }
-
- @Override
- public Map<String, String> getMetadata() {
- return delegate.getMetadata();
- }
+
+ private DocumentStore delegate;
+
+ //TODO: remove mec
+ boolean printStacks;
+
+ class Stats {
+
+ private int numFindCalls;
+ private int numQueryCalls;
+ private int numRemoveCalls;
+ private int numCreateOrUpdateCalls;
+
+ }
+
+ private Map<Collection, Stats> collectionStats = new HashMap<Collection, Stats>();
+
+ public CountingDocumentStore(DocumentStore delegate) {
+ this.delegate = delegate;
+ }
+
+ public void resetCounters() {
+ collectionStats.clear();
+ }
+
+ public int getNumFindCalls(Collection collection) {
+ return getStats(collection).numFindCalls;
+ }
+
+ public int getNumQueryCalls(Collection collection) {
+ return getStats(collection).numQueryCalls;
+ }
+
+ public int getNumRemoveCalls(Collection collection) {
+ return getStats(collection).numRemoveCalls;
+ }
+
+ public int getNumCreateOrUpdateCalls(Collection collection) {
+ return getStats(collection).numCreateOrUpdateCalls;
+ }
+
+ private Stats getStats(Collection collection) {
+ if (!collectionStats.containsKey(collection)) {
+ Stats s = new Stats();
+ collectionStats.put(collection, s);
+ return s;
+ } else {
+ return collectionStats.get(collection);
+ }
+ }
+
+ @Override
+ public <T extends Document> T find(Collection<T> collection, String key) {
+ getStats(collection).numFindCalls++;
+ if (printStacks) {
+ new Exception("find [" + getStats(collection).numFindCalls + "] (" + collection + ") " + key).printStackTrace();
+ }
+ return delegate.find(collection, key);
+ }
+
+ @Override
+ public <T extends Document> T find(Collection<T> collection,
+ String key,
+ int maxCacheAge) {
+ getStats(collection).numFindCalls++;
+ if (printStacks) {
+ new Exception("find [" + getStats(collection).numFindCalls + "] (" + collection + ") " + key + " [max: " + maxCacheAge + "]").printStackTrace();
+ }
+ return delegate.find(collection, key, maxCacheAge);
+ }
+
+ @Nonnull
+ @Override
+ public <T extends Document> List<T> query(Collection<T> collection,
+ String fromKey,
+ String toKey,
+ int limit) {
+ getStats(collection).numQueryCalls++;
+ if (printStacks) {
+ new Exception("query1 [" + getStats(collection).numQueryCalls + "] (" + collection + ") " + fromKey + ", to " + toKey + ". limit " + limit).printStackTrace();
+ }
+ return delegate.query(collection, fromKey, toKey, limit);
+ }
+
+ @Nonnull
+ @Override
+ public <T extends Document> List<T> query(Collection<T> collection,
+ String fromKey,
+ String toKey,
+ String indexedProperty,
+ long startValue,
+ int limit) {
+ getStats(collection).numQueryCalls++;
+ if (printStacks) {
+ new Exception("query2 [" + getStats(collection).numQueryCalls + "] (" + collection + ") " + fromKey + ", to " + toKey + ". limit " + limit).printStackTrace();
+ }
+ return delegate.query(collection, fromKey, toKey, indexedProperty, startValue, limit);
+ }
+
+ @Override
+ public <T extends Document> void remove(Collection<T> collection,
+ String key) {
+ getStats(collection).numRemoveCalls++;
+ delegate.remove(collection, key);
+ }
+
+ @Override
+ public <T extends Document> void remove(Collection<T> collection,
+ List<String> keys) {
+ getStats(collection).numRemoveCalls++;
+ delegate.remove(collection, keys);
+ }
+
+ @Override
+ public <T extends Document> int remove(Collection<T> collection,
+ Map<String, Map<Key, Condition>> toRemove) {
+ getStats(collection).numRemoveCalls++;
+ return delegate.remove(collection, toRemove);
+ }
+
+ @Override
+ public <T extends Document> boolean create(Collection<T> collection,
+ List<UpdateOp> updateOps) {
+ getStats(collection).numCreateOrUpdateCalls++;
+ return delegate.create(collection, updateOps);
+ }
+
+ @Override
+ public <T extends Document> void update(Collection<T> collection,
+ List<String> keys,
+ UpdateOp updateOp) {
+ getStats(collection).numCreateOrUpdateCalls++;
+ delegate.update(collection, keys, updateOp);
+ }
+
+ @Override
+ public <T extends Document> T createOrUpdate(Collection<T> collection,
+ UpdateOp update) {
+ getStats(collection).numCreateOrUpdateCalls++;
+ return delegate.createOrUpdate(collection, update);
+ }
+
+ @Override
+ public <T extends Document> T findAndUpdate(Collection<T> collection,
+ UpdateOp update) {
+ getStats(collection).numCreateOrUpdateCalls++;
+ return delegate.findAndUpdate(collection, update);
+ }
+
+ @Override
+ public CacheInvalidationStats invalidateCache() {
+ return delegate.invalidateCache();
+ }
+
+ @Override
+ public <T extends Document> void invalidateCache(Collection<T> collection,
+ String key) {
+ delegate.invalidateCache(collection, key);
+ }
+
+ @Override
+ public void dispose() {
+ delegate.dispose();
+ }
+
+ @Override
+ public <T extends Document> T getIfCached(Collection<T> collection,
+ String key) {
+ return delegate.getIfCached(collection, key);
+ }
+
+ @Override
+ public void setReadWriteMode(String readWriteMode) {
+ delegate.setReadWriteMode(readWriteMode);
+ }
+
+ @Override
+ public CacheStats getCacheStats() {
+ return delegate.getCacheStats();
+ }
+
+ @Override
+ public Map<String, String> getMetadata() {
+ return delegate.getMetadata();
+ }
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java?rev=1685964&r1=1685963&r2=1685964&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java Wed Jun 17 09:10:22 2015
@@ -16,45 +16,50 @@
*/
package org.apache.jackrabbit.oak.plugins.document;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
public class CountingTieredDiffCache extends TieredDiffCache {
- class CountingLoader implements Loader {
+ class CountingLoader implements Loader {
+
+ private Loader delegate;
+
+ CountingLoader(Loader delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public String call() {
+ incLoadCount();
+ return delegate.call();
+ }
+
+ }
+
+ private int loadCount;
+
+ public CountingTieredDiffCache(DocumentMK.Builder builder) {
+ super(builder);
+ }
+
+ private void incLoadCount() {
+ loadCount++;
+ }
- private Loader delegate;
+ public int getLoadCount() {
+ return loadCount;
+ }
- CountingLoader(Loader delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public String call() {
- incLoadCount();
- return delegate.call();
- }
-
- }
-
- private int loadCount;
-
- public CountingTieredDiffCache(DocumentMK.Builder builder) {
- super(builder);
- }
-
- private void incLoadCount() {
- loadCount++;
- }
-
- public int getLoadCount() {
- return loadCount;
- }
-
- public void resetLoadCounter() {
- loadCount = 0;
- }
-
- @Override
- public String getChanges(Revision from, Revision to, String path,
- Loader loader) {
- return super.getChanges(from, to, path, new CountingLoader(loader));
- }
+ public void resetLoadCounter() {
+ loadCount = 0;
+ }
+
+ @Override
+ public String getChanges(@Nonnull Revision from,
+ @Nonnull Revision to,
+ @Nonnull String path,
+ @Nullable Loader loader) {
+ return super.getChanges(from, to, path, new CountingLoader(loader));
+ }
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java?rev=1685964&r1=1685963&r2=1685964&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java Wed Jun 17 09:10:22 2015
@@ -16,12 +16,14 @@
*/
package org.apache.jackrabbit.oak.plugins.document;
+import static java.util.Arrays.asList;
+import static java.util.Collections.synchronizedList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -126,7 +128,6 @@ public class JournalTest {
incomingRootStates1.wait();
} catch (InterruptedException e) {
// ignore
- continue;
}
}
newRoot = incomingRootStates1.remove(0);
@@ -274,7 +275,7 @@ public class JournalTest {
observer.clear();
countingDocStore1.resetCounters();
countingDocStore2.resetCounters();
- countingDocStore1.printStacks = true;
+ // countingDocStore1.printStacks = true;
countingDiffCache1.resetLoadCounter();
countingDiffCache2.resetLoadCounter();
@@ -294,7 +295,7 @@ public class JournalTest {
assertEquals(0, countingDiffCache1.getLoadCount());
// let node 1 read those changes
- System.err.println("run background ops");
+ // System.err.println("run background ops");
ns1.runBackgroundOperations();
mk2.commit("/", "+\"regular5\": {}", null, null);
ns2.runBackgroundOperations();
@@ -442,8 +443,7 @@ public class JournalTest {
// just some no-ops:
recovery.recover(c2Id);
- List<NodeDocument> emptyList = new LinkedList<NodeDocument>();
- recovery.recover(emptyList.iterator(), c2Id);
+ recovery.recover(Iterators.<NodeDocument>emptyIterator(), c2Id);
assertJournalEntries(ds1, "{}", change1); // unchanged
assertJournalEntries(ds2, "{}", change2, change2b);
@@ -454,8 +454,8 @@ public class JournalTest {
final CountDownLatch ready = new CountDownLatch(NUM_THREADS);
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch end = new CountDownLatch(NUM_THREADS);
- for(int i=0; i<NUM_THREADS; i++) {
- final List<Throwable> throwables = new LinkedList<Throwable>();
+ final List<Exception> exceptions = synchronizedList(new ArrayList<Exception>());
+ for (int i = 0; i < NUM_THREADS; i++) {
Thread th = new Thread(new Runnable() {
@Override
@@ -464,10 +464,8 @@ public class JournalTest {
ready.countDown();
start.await();
recovery.recover(Iterators.forArray(x1,z1), c2Id);
- } catch (Throwable e) {
- synchronized(throwables) {
- throwables.add(e);
- }
+ } catch (Exception e) {
+ exceptions.add(e);
} finally {
end.countDown();
}
@@ -481,11 +479,14 @@ public class JournalTest {
assertTrue(end.await(20, TimeUnit.SECONDS));
assertJournalEntries(ds1, "{}", change1); // unchanged
assertJournalEntries(ds2, "{}", change2, change2b);
+ for (Exception ex : exceptions) {
+ throw ex;
+ }
}
}
void assertJournalEntries(DocumentNodeStore ds, String... expectedChanges) {
- List<String> exp = new LinkedList<String>(Arrays.asList(expectedChanges));
+ List<String> exp = new LinkedList<String>(asList(expectedChanges));
for(boolean branch : new Boolean[]{false, true}) {
String fromKey = JournalEntry.asId(new Revision(0, 0, ds.getClusterId(), branch));
String toKey = JournalEntry.asId(new Revision(System.currentTimeMillis()+1000, 0, ds.getClusterId(), branch));
@@ -494,13 +495,13 @@ public class JournalTest {
for (Iterator<JournalEntry> it = entries.iterator(); it.hasNext();) {
JournalEntry journalEntry = it.next();
if (!exp.remove(journalEntry.get("_c"))) {
- fail("Found an unexpected change: "+journalEntry.get("_c")+", while all I expected was: "+expectedChanges);
+ fail("Found an unexpected change: "+journalEntry.get("_c")+", while all I expected was: "+asList(expectedChanges));
}
}
}
}
if (exp.size()>0) {
- fail("Did not find all expected changes, left over: "+exp+" (from original list which is: "+expectedChanges+")");
+ fail("Did not find all expected changes, left over: "+exp+" (from original list which is: "+asList(expectedChanges)+")");
}
}