You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by st...@apache.org on 2020/08/04 11:07:07 UTC
svn commit: r1880565 - in /jackrabbit/oak/trunk/oak-store-document/src:
main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java
Author: stefanegli
Date: Tue Aug 4 11:07:07 2020
New Revision: 1880565
URL: http://svn.apache.org/viewvc?rev=1880565&view=rev
Log:
OAK-9149 : reverting svn rev 1880432
Modified:
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java
Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1880565&r1=1880564&r2=1880565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Tue Aug 4 11:07:07 2020
@@ -48,7 +48,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -127,7 +126,6 @@ import org.apache.jackrabbit.oak.commons
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.jetbrains.annotations.TestOnly;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -324,8 +322,6 @@ public final class DocumentNodeStore
*/
private Thread backgroundUpdateThread;
- BackgroundUpdateOperation backgroundUpdateOperation;
-
/**
* Monitor object to synchronize background writes.
*/
@@ -691,7 +687,7 @@ public final class DocumentNodeStore
"DocumentNodeStore background read thread " + threadNamePostfix);
backgroundReadThread.setDaemon(true);
backgroundUpdateThread = new Thread(
- backgroundUpdateOperation = new BackgroundUpdateOperation(this, isDisposed),
+ new BackgroundUpdateOperation(this, isDisposed),
"DocumentNodeStore background update thread " + threadNamePostfix);
backgroundUpdateThread.setDaemon(true);
backgroundSweepThread = new Thread(
@@ -2382,10 +2378,7 @@ public final class DocumentNodeStore
private void backgroundSplit() {
Set<Path> invalidatedPaths = new HashSet<>();
- Set<Path> pathsToinvalidate = new HashSet<>();
RevisionVector head = getHeadRevision();
- List<UpdateOp> splitOps = new LinkedList<>();
- List<String> removeCandiates = new LinkedList<>();
for (Iterator<String> it = splitCandidates.keySet().iterator(); it.hasNext();) {
String id = it.next();
NodeDocument doc = store.find(Collection.NODES, id);
@@ -2399,65 +2392,43 @@ public final class DocumentNodeStore
// already has a pending _lastRev update or an invalidation
// entry was already added in this backgroundSplit() call
if (unsavedLastRevisions.get(path) == null
- && !invalidatedPaths.contains(path)) {
- pathsToinvalidate.add(path);
+ && invalidatedPaths.add(path)) {
+ // create journal entry for cache invalidation
+ JournalEntry entry = JOURNAL.newDocument(getDocumentStore());
+ entry.modified(path);
+ Revision r = newRevision().asBranchRevision();
+ UpdateOp journalOp = entry.asUpdateOp(r);
+ if (store.create(JOURNAL, singletonList(journalOp))) {
+ changes.invalidate(singletonList(r));
+ LOG.debug("Journal entry {} created for split of document {}",
+ journalOp.getId(), path);
+ } else {
+ String msg = "Unable to create journal entry " +
+ journalOp.getId() + " for document invalidation. " +
+ "Will be retried with next background split " +
+ "operation.";
+ throw new DocumentStoreException(msg);
+ }
+ }
+ // apply the split operations
+ NodeDocument before = null;
+ if (!op.isNew() ||
+ !store.create(Collection.NODES, Collections.singletonList(op))) {
+ before = store.createOrUpdate(Collection.NODES, op);
}
- splitOps.add(op);
- }
- removeCandiates.add(id);
- if (splitOps.size() >= getCreateOrUpdateBatchSize()) {
- batchSplit(splitOps, pathsToinvalidate);
- splitOps.clear();
- invalidatedPaths.addAll(pathsToinvalidate);
- pathsToinvalidate.clear();
- splitCandidates.keySet().removeAll(removeCandiates);
- removeCandiates.clear();
- }
- }
-
- if (splitOps.size() > 0) {
- batchSplit(splitOps, pathsToinvalidate);
- splitCandidates.keySet().removeAll(removeCandiates);
- }
- }
-
- private void batchSplit(List<UpdateOp> splitOps, Set<Path> pathsToinvalidate) {
- if (!pathsToinvalidate.isEmpty()) {
- // create journal entry for cache invalidation
- JournalEntry entry = JOURNAL.newDocument(getDocumentStore());
- entry.modified(pathsToinvalidate);
- Revision r = newRevision().asBranchRevision();
- UpdateOp journalOp = entry.asUpdateOp(r);
- if (store.create(JOURNAL, singletonList(journalOp))) {
- changes.invalidate(singletonList(r));
- LOG.debug("Journal entry {} created for split of document(s) {}",
- journalOp.getId(), pathsToinvalidate);
- } else {
- String msg = "Unable to create journal entry " +
- journalOp.getId() + " for document invalidation. " +
- "Will be retried with next background split " +
- "operation.";
- throw new DocumentStoreException(msg);
- }
- }
- // apply the split operations
- List<NodeDocument> beforeList = store.createOrUpdate(Collection.NODES, splitOps);
- if (LOG.isDebugEnabled()) {
- // this is rather expensive - but given we were doing log.debug before
- // the batchSplit mechanism, so this somewhat negates the batch improvement indeed
- for (int i = 0; i < splitOps.size(); i++) {
- UpdateOp op = splitOps.get(i);
- NodeDocument before = beforeList.size() > i ? beforeList.get(i) : null;
if (before != null) {
- NodeDocument after = store.find(Collection.NODES, op.getId());
- if (after != null) {
- LOG.debug("Split operation on {}. Size before: {}, after: {}",
- op.getId(), before.getMemory(), after.getMemory());
+ if (LOG.isDebugEnabled()) {
+ NodeDocument after = store.find(Collection.NODES, op.getId());
+ if (after != null) {
+ LOG.debug("Split operation on {}. Size before: {}, after: {}",
+ id, before.getMemory(), after.getMemory());
+ }
}
} else {
LOG.debug("Split operation created {}", op.getId());
}
}
+ it.remove();
}
}
@@ -3189,23 +3160,6 @@ public final class DocumentNodeStore
}
}
- /**
- * FOR TESTING ONLY :
- * stops the backgroundUpdateThread (by overwriting its
- * isDisposed flag) and optionally waits for the thread to
- * terminate.
- * @param timeoutMillis optional amount of millis to wait for the thread to terminate at max
- * @return true if thread is no longer running
- */
- @TestOnly
- boolean stopBackgroundUpdateThread(long timeoutMillis) throws InterruptedException {
- backgroundUpdateOperation.forceStop();
- if (timeoutMillis > 0) {
- backgroundUpdateThread.join(timeoutMillis);
- }
- return !backgroundUpdateThread.isAlive();
- }
-
public DocumentNodeStoreMBean getMBean() {
return mbean;
}
@@ -3222,7 +3176,7 @@ public final class DocumentNodeStore
private static abstract class NodeStoreTask implements Runnable {
final WeakReference<DocumentNodeStore> ref;
- private AtomicBoolean isDisposed;
+ private final AtomicBoolean isDisposed;
private final Supplier<Integer> delaySupplier;
private boolean failing;
@@ -3248,10 +3202,6 @@ public final class DocumentNodeStore
this(nodeStore, isDisposed, null);
}
- void forceStop() {
- isDisposed = new AtomicBoolean(true);
- }
-
protected abstract void execute(@NotNull DocumentNodeStore nodeStore);
@Override
Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java?rev=1880565&r1=1880564&r2=1880565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java Tue Aug 4 11:07:07 2020
@@ -25,7 +25,6 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
@@ -36,7 +35,6 @@ import org.apache.jackrabbit.oak.api.Pro
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
-import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
@@ -47,11 +45,7 @@ import org.apache.jackrabbit.oak.spi.sta
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.stats.Clock;
import org.jetbrains.annotations.NotNull;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -87,118 +81,6 @@ import static org.junit.Assert.fail;
*/
public class DocumentSplitTest extends BaseDocumentMKTest {
- private static final Logger LOG = LoggerFactory.getLogger(DocumentSplitTest.class);
-
- private String createOrUpdateBatchSize;
- private boolean createOrUpdateBatchSizeIsNull;
-
- @Before
- public void backupProperty() {
- createOrUpdateBatchSize = System.getProperty("oak.documentMK.createOrUpdateBatchSize");
- if (createOrUpdateBatchSize == null) {
- createOrUpdateBatchSizeIsNull = true;
- }
- }
-
- @After
- public void restoreProperty() {
- if (createOrUpdateBatchSize != null) {
- System.setProperty("oak.documentMK.createOrUpdateBatchSize", createOrUpdateBatchSize);
- } else if (createOrUpdateBatchSizeIsNull) {
- System.clearProperty("oak.documentMK.createOrUpdateBatchSize");
- }
- }
-
- @Test
- public void largeBatchSplitTest() throws Exception {
- for(int i=1; i<21; i+=5) {
- batchSplitTest(1000, i * 1000);
- }
- }
-
- @Test
- public void mediumBatchSplitTest() throws Exception {
- batchSplitTest(50, 1000);
- }
-
- @Test
- public void smallBatchSplitTest() throws Exception {
- batchSplitTest(1, 1000);
- }
-
- private void batchSplitTest(int batchSize, int splitDocCnt) throws Exception {
- LOG.info("batchSplitTest: batchSize = " + batchSize+ ", splitDocCnt = " + splitDocCnt);
- // this tests wants to use CountingDocumentStore - hence creating a fresh DocumentMk
- // plus it wants to set the batchSize
- if (mk != null) {
- mk.dispose();
- mk = null;
- }
-
- System.setProperty("oak.documentMK.createOrUpdateBatchSize", String.valueOf(batchSize));
- DocumentMK.Builder mkBuilder = new DocumentMK.Builder();
- MemoryDocumentStore delegateStore = new MemoryDocumentStore();
- CountingDocumentStore store = new CountingDocumentStore(delegateStore);
- mkBuilder.setDocumentStore(store);
- mk = mkBuilder.open();
- DocumentNodeStore ns = mk.getNodeStore();
- assertTrue(ns.stopBackgroundUpdateThread(10000));
- assertEquals(batchSize, ns.getCreateOrUpdateBatchSize());
-
- NodeBuilder builder = ns.getRoot().builder();
- for(int child = 0; child < 100; child++) {
- builder.child("testchild-" + child);
- }
- ns.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
-
- for(int i=0; i<2; i++) {
- builder = ns.getRoot().builder();
- for(int child = 0; child < splitDocCnt; child++) {
- PropertyState binary = binaryProperty("prop", randomBytes(5 * 1024));
- builder.child("testchild-" + child).setProperty(binary);
- }
- ns.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
- }
- store.resetCounters();
- ns.runBackgroundUpdateOperations();
- int createOrUpdates = store.getNumCreateOrUpdateCalls(NODES);
- int minBatchSplitCalls = Math.max(1, splitDocCnt / Math.max(1, batchSize / 2));
- int maxBatchSplitCalls = minBatchSplitCalls + Math.max(1, splitDocCnt % batchSize);
- // backgroundWrite could issue another 2 writes, so:
- maxBatchSplitCalls += 2;
- assertTrue("batchSize = " + batchSize
- + ", splitDocCnt = " + splitDocCnt
- + ", minBatchSplitCalls=" + minBatchSplitCalls
- + ", createOrUpdates=" + createOrUpdates,
- minBatchSplitCalls <= createOrUpdates);
- assertTrue("batchSize = " + batchSize
- + ", splitDocCnt = " + splitDocCnt
- + ", minBatchSplitCalls=" + minBatchSplitCalls
- + ", maxBatchSplitCalls=" + maxBatchSplitCalls
- + ", createOrUpdates="+createOrUpdates,
- maxBatchSplitCalls >= createOrUpdates);
- VersionGarbageCollector gc = ns.getVersionGarbageCollector();
-
- int actualSplitDocGCCount = 0;
- long timeout = ns.getClock().getTime() + 10000;
- while(actualSplitDocGCCount != splitDocCnt && ns.getClock().getTime() < timeout) {
- VersionGCStats stats = gc.gc(1, TimeUnit.MILLISECONDS);
- actualSplitDocGCCount += stats.splitDocGCCount;
- if (actualSplitDocGCCount != splitDocCnt) {
- LOG.info("batchSplitTest: Expected " + splitDocCnt + ", actual " + actualSplitDocGCCount);
- // advance time a bit to ensure gc does clean up the split docs
- ns.getClock().waitUntil(ns.getClock().getTime() + 1000);
- ns.runBackgroundUpdateOperations();
- }
- }
-
- // make sure those splitDocCnt split docs are deleted
- assertEquals("gc not as expected: expected " + splitDocCnt
- + ", got " + actualSplitDocGCCount, splitDocCnt, actualSplitDocGCCount);
- mk.dispose();
- mk = null;
- }
-
@Test
public void splitRevisions() throws Exception {
DocumentStore store = mk.getDocumentStore();