You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2015/09/07 17:21:31 UTC
svn commit: r1701635 - in
/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment:
SegmentCompactionIT.java SegmentCompactionMBean.java
Author: mduerig
Date: Mon Sep 7 15:21:30 2015
New Revision: 1701635
URL: http://svn.apache.org/r1701635
Log:
OAK-2849: Improve revision gc on SegmentMK
Add option to run compaction exclusively wrt. concurrent writers
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionMBean.java
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java?rev=1701635&r1=1701634&r2=1701635&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java Mon Sep 7 15:21:30 2015
@@ -56,6 +56,8 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.InstanceAlreadyExistsException;
import javax.management.MBeanRegistrationException;
@@ -139,6 +141,7 @@ public class SegmentCompactionIT {
private Registration mBeanRegistration;
private volatile ListenableFuture<?> compactor = immediateCancelledFuture();
+ private volatile ReadWriteLock compactionLock = null;
private volatile int lockWaitTime = 60;
private volatile int maxReaders = 10;
private volatile int maxWriters = 10;
@@ -428,23 +431,42 @@ public class SegmentCompactionIT {
cancelled = true;
}
- @Override
- public Void call() throws IOException, CommitFailedException {
- NodeBuilder root = nodeStore.getRoot().builder();
- for (int k = 0; k < opCount && !cancelled; k++) {
- modify(nodeStore, root);
- }
- if (!cancelled) {
+ private <T> T run(Callable<T> thunk) throws Exception {
+ ReadWriteLock lock = compactionLock;
+ if (lock != null) {
+ lock.readLock().lock();
try {
- CommitHook commitHook = rnd.nextBoolean()
- ? new CompositeHook(new ConflictHook(DefaultConflictHandler.OURS))
- : new CompositeHook(new ConflictHook(DefaultConflictHandler.THEIRS));
- nodeStore.merge(root, commitHook, CommitInfo.EMPTY);
- } catch (CommitFailedException e) {
- LOG.warn("Commit failed: {}", e.getMessage());
+ return thunk.call();
+ } finally {
+ lock.readLock().unlock();
}
+ } else {
+ return thunk.call();
}
- return null;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ return run(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ NodeBuilder root = nodeStore.getRoot().builder();
+ for (int k = 0; k < opCount && !cancelled; k++) {
+ modify(nodeStore, root);
+ }
+ if (!cancelled) {
+ try {
+ CommitHook commitHook = rnd.nextBoolean()
+ ? new CompositeHook(new ConflictHook(DefaultConflictHandler.OURS))
+ : new CompositeHook(new ConflictHook(DefaultConflictHandler.THEIRS));
+ nodeStore.merge(root, commitHook, CommitInfo.EMPTY);
+ } catch (CommitFailedException e) {
+ LOG.warn("Commit failed: {}", e.getMessage());
+ }
+ }
+ return null;
+ }
+ });
}
private void modify(NodeStore nodeStore, NodeBuilder nodeBuilder) throws IOException {
@@ -621,7 +643,7 @@ public class SegmentCompactionIT {
}
}
- private static class Compactor implements Runnable {
+ private class Compactor implements Runnable {
private final FileStore fileStore;
private final TestGCMonitor gcMonitor;
@@ -630,12 +652,36 @@ public class SegmentCompactionIT {
this.gcMonitor = gcMonitor;
}
+ private <T> T run(Callable<T> thunk) throws Exception {
+ ReadWriteLock lock = compactionLock;
+ if (lock != null) {
+ lock.writeLock().lock();
+ try {
+ return thunk.call();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ } else {
+ return thunk.call();
+ }
+ }
+
@Override
public void run() {
if (gcMonitor.isCleaned()) {
LOG.info("Running compaction");
- gcMonitor.resetCleaned();
- fileStore.maybeCompact(true);
+ try {
+ run(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ gcMonitor.resetCleaned();
+ fileStore.maybeCompact(true);
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Error while running compaction", e);
+ }
} else {
LOG.info("Not running compaction as no cleanup has taken place");
}
@@ -742,6 +788,20 @@ public class SegmentCompactionIT {
}
@Override
+ public void setUseCompactionLock(boolean value) {
+ if (value && compactionLock == null) {
+ compactionLock = new ReentrantReadWriteLock();
+ } else {
+ compactionLock = null;
+ }
+ }
+
+ @Override
+ public boolean getUseCompactionLock() {
+ return compactionLock != null;
+ }
+
+ @Override
public void setLockWaitTime(int seconds) {
lockWaitTime = seconds;
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionMBean.java?rev=1701635&r1=1701634&r2=1701635&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionMBean.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionMBean.java Mon Sep 7 15:21:30 2015
@@ -60,6 +60,17 @@ public interface SegmentCompactionMBean
String getLastCompaction();
/**
+ * Determine whether to compaction should run exclusively wrt. concurrent writers.
+ * @param value run compaction exclusively iff {@code true}
+ */
+ void setUseCompactionLock(boolean value);
+
+ /**
+ * @return Compaction runs exclusively wrt. concurrent writers iff {@code true}
+ */
+ boolean getUseCompactionLock();
+
+ /**
* Time to wait for the commit lock for committing the compacted head.
* @param seconds number of seconds to wait
* @see SegmentNodeStore#locked(java.util.concurrent.Callable, long, java.util.concurrent.TimeUnit)