You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2015/09/07 17:21:31 UTC

svn commit: r1701635 - in /jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment: SegmentCompactionIT.java SegmentCompactionMBean.java

Author: mduerig
Date: Mon Sep  7 15:21:30 2015
New Revision: 1701635

URL: http://svn.apache.org/r1701635
Log:
OAK-2849: Improve revision gc on SegmentMK
Add option to run compaction exclusively wrt. concurrent writers

Modified:
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionMBean.java

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java?rev=1701635&r1=1701634&r2=1701635&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java Mon Sep  7 15:21:30 2015
@@ -56,6 +56,8 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.management.InstanceAlreadyExistsException;
 import javax.management.MBeanRegistrationException;
@@ -139,6 +141,7 @@ public class SegmentCompactionIT {
     private Registration mBeanRegistration;
 
     private volatile ListenableFuture<?> compactor = immediateCancelledFuture();
+    private volatile ReadWriteLock compactionLock = null;
     private volatile int lockWaitTime = 60;
     private volatile int maxReaders = 10;
     private volatile int maxWriters = 10;
@@ -428,23 +431,42 @@ public class SegmentCompactionIT {
             cancelled = true;
         }
 
-        @Override
-        public Void call() throws IOException, CommitFailedException {
-            NodeBuilder root = nodeStore.getRoot().builder();
-            for (int k = 0; k < opCount && !cancelled; k++) {
-                modify(nodeStore, root);
-            }
-            if (!cancelled) {
+        private <T> T run(Callable<T> thunk) throws Exception {
+            ReadWriteLock lock = compactionLock;
+            if (lock != null) {
+                lock.readLock().lock();
                 try {
-                    CommitHook commitHook = rnd.nextBoolean()
-                        ? new CompositeHook(new ConflictHook(DefaultConflictHandler.OURS))
-                        : new CompositeHook(new ConflictHook(DefaultConflictHandler.THEIRS));
-                    nodeStore.merge(root, commitHook, CommitInfo.EMPTY);
-                } catch (CommitFailedException e) {
-                    LOG.warn("Commit failed: {}", e.getMessage());
+                    return thunk.call();
+                } finally {
+                    lock.readLock().unlock();
                 }
+            } else {
+                return thunk.call();
             }
-            return null;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            return run(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    NodeBuilder root = nodeStore.getRoot().builder();
+                    for (int k = 0; k < opCount && !cancelled; k++) {
+                        modify(nodeStore, root);
+                    }
+                    if (!cancelled) {
+                        try {
+                            CommitHook commitHook = rnd.nextBoolean()
+                                    ? new CompositeHook(new ConflictHook(DefaultConflictHandler.OURS))
+                                    : new CompositeHook(new ConflictHook(DefaultConflictHandler.THEIRS));
+                            nodeStore.merge(root, commitHook, CommitInfo.EMPTY);
+                        } catch (CommitFailedException e) {
+                            LOG.warn("Commit failed: {}", e.getMessage());
+                        }
+                    }
+                    return null;
+                }
+            });
         }
 
         private void modify(NodeStore nodeStore, NodeBuilder nodeBuilder) throws IOException {
@@ -621,7 +643,7 @@ public class SegmentCompactionIT {
         }
     }
 
-    private static class Compactor implements Runnable {
+    private class Compactor implements Runnable {
         private final FileStore fileStore;
         private final TestGCMonitor gcMonitor;
 
@@ -630,12 +652,36 @@ public class SegmentCompactionIT {
             this.gcMonitor = gcMonitor;
         }
 
+        private <T> T run(Callable<T> thunk) throws Exception {
+            ReadWriteLock lock = compactionLock;
+            if (lock != null) {
+                lock.writeLock().lock();
+                try {
+                    return thunk.call();
+                } finally {
+                    lock.writeLock().unlock();
+                }
+            } else {
+                return thunk.call();
+            }
+        }
+
         @Override
         public void run() {
             if (gcMonitor.isCleaned()) {
                 LOG.info("Running compaction");
-                gcMonitor.resetCleaned();
-                fileStore.maybeCompact(true);
+                try {
+                    run(new Callable<Void>() {
+                        @Override
+                        public Void call() throws Exception {
+                            gcMonitor.resetCleaned();
+                            fileStore.maybeCompact(true);
+                            return null;
+                        }
+                    });
+                } catch (Exception e) {
+                    LOG.error("Error while running compaction", e);
+                }
             } else {
                 LOG.info("Not running compaction as no cleanup has taken place");
             }
@@ -742,6 +788,20 @@ public class SegmentCompactionIT {
         }
 
         @Override
+        public void setUseCompactionLock(boolean value) {
+            if (value && compactionLock == null) {
+                compactionLock = new ReentrantReadWriteLock();
+            } else {
+                compactionLock = null;
+            }
+        }
+
+        @Override
+        public boolean getUseCompactionLock() {
+            return compactionLock != null;
+        }
+
+        @Override
         public void setLockWaitTime(int seconds) {
             lockWaitTime = seconds;
         }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionMBean.java?rev=1701635&r1=1701634&r2=1701635&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionMBean.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionMBean.java Mon Sep  7 15:21:30 2015
@@ -60,6 +60,17 @@ public interface SegmentCompactionMBean
     String getLastCompaction();
 
     /**
+     * Determine whether to compaction should run exclusively wrt. concurrent writers.
+     * @param value  run compaction exclusively iff {@code true}
+     */
+    void setUseCompactionLock(boolean value);
+
+    /**
+     * @return  Compaction runs exclusively wrt. concurrent writers iff {@code true}
+     */
+    boolean getUseCompactionLock();
+
+    /**
      * Time to wait for the commit lock for committing the compacted head.
      * @param seconds  number of seconds to wait
      * @see SegmentNodeStore#locked(java.util.concurrent.Callable, long, java.util.concurrent.TimeUnit)