You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2015/04/15 16:41:03 UTC

svn commit: r1673787 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/ main/java/org/apache/jackrabbit/oak/plugins/segment/file/ test/java/org/apache/jackrabbit/oak/plugins/segment/

Author: mduerig
Date: Wed Apr 15 14:41:02 2015
New Revision: 1673787

URL: http://svn.apache.org/r1673787
Log:
OAK-2713: High memory usage of CompactionMap
IT for monitoring GC and simulating various conditions through JMX

Added:
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionMBean.java
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategyMBean.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreGCMonitor.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategyMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategyMBean.java?rev=1673787&r1=1673786&r2=1673787&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategyMBean.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategyMBean.java Wed Apr 15 14:41:02 2015
@@ -19,14 +19,18 @@
 
 package org.apache.jackrabbit.oak.plugins.segment.compaction;
 
+import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
 import org.apache.jackrabbit.oak.plugins.segment.CompactionMap;
 import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType;
 
-public class DefaultCompactionStrategyMBean implements CompactionStrategyMBean {
+public class DefaultCompactionStrategyMBean
+        extends AnnotatedStandardMBean
+        implements CompactionStrategyMBean {
 
     private final CompactionStrategy strategy;
 
     public DefaultCompactionStrategyMBean(CompactionStrategy strategy) {
+        super(CompactionStrategyMBean.class);
         this.strategy = strategy;
     }
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreGCMonitor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreGCMonitor.java?rev=1673787&r1=1673786&r2=1673787&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreGCMonitor.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreGCMonitor.java Wed Apr 15 14:41:02 2015
@@ -31,6 +31,7 @@ import java.util.Date;
 import javax.annotation.Nonnull;
 import javax.management.openmbean.CompositeData;
 
+import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
 import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
 import org.apache.jackrabbit.oak.stats.Clock;
 import org.apache.jackrabbit.stats.TimeSeriesRecorder;
@@ -43,7 +44,8 @@ import org.apache.jackrabbit.stats.TimeS
  * second to ensure the various time series maintained by this implementation
  * are correctly aggregated.
  */
-public class FileStoreGCMonitor implements GCMonitor, GCMonitorMBean, Runnable {
+public class FileStoreGCMonitor extends AnnotatedStandardMBean
+        implements GCMonitor, GCMonitorMBean, Runnable {
     private final TimeSeriesRecorder gcCount = new TimeSeriesRecorder(true);
     private final TimeSeriesRecorder repositorySize = new TimeSeriesRecorder(false);
     private final TimeSeriesRecorder reclaimedSize = new TimeSeriesRecorder(true);
@@ -56,6 +58,7 @@ public class FileStoreGCMonitor implemen
     private String status = "NA";
 
     public FileStoreGCMonitor(@Nonnull Clock clock) {
+        super(GCMonitorMBean.class);
         this.clock = checkNotNull(clock);
     }
 

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java?rev=1673787&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java Wed Apr 15 14:41:02 2015
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.Iterables.get;
+import static com.google.common.collect.Sets.newConcurrentHashSet;
+import static com.google.common.util.concurrent.Futures.addCallback;
+import static com.google.common.util.concurrent.Futures.immediateCancelledFuture;
+import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
+import static java.io.File.createTempFile;
+import static java.lang.String.valueOf;
+import static java.lang.System.getProperty;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.commons.io.FileUtils.deleteDirectory;
+import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType.CLEAN_OLD;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.MEMORY_THRESHOLD_DEFAULT;
+import static org.apache.jackrabbit.oak.plugins.segment.file.FileStore.newFileStore;
+import static org.junit.Assume.assumeTrue;
+import static org.slf4j.helpers.MessageFormatter.arrayFormat;
+import static org.slf4j.helpers.MessageFormatter.format;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.management.ManagementFactory;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+
+import javax.annotation.Nonnull;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableScheduledFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
+import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy;
+import org.apache.jackrabbit.oak.plugins.segment.compaction.DefaultCompactionStrategyMBean;
+import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
+import org.apache.jackrabbit.oak.plugins.segment.file.FileStoreGCMonitor;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
+import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a longeivity test for SegmentMK compaction. The test schedules a number
+ * of readers, writers, a compactor and holds some references for a certain time.
+ * All of which can be interactively modified through the accompanying
+ * {@link SegmentCompactionITMBean}, the
+ * {@link org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategyMBean} and the
+ * {@link org.apache.jackrabbit.oak.plugins.segment.file.GCMonitorMBean}.
+ *
+ * TODO Leverage longeivity test support from OAK-2771 once we have it.
+ */
+public class SegmentCompactionIT {
+    /** Only run if explicitly asked to via -Dtest=SegmentCompactionIT */
+    private static final boolean ENABLED =
+            SegmentCompactionIT.class.getSimpleName().equals(getProperty("test"));
+    private static final Logger LOG = LoggerFactory.getLogger(SegmentCompactionIT.class);
+
+    private final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+
+    private final Random rnd = new Random();
+    private final ListeningScheduledExecutorService scheduler =
+            listeningDecorator(newScheduledThreadPool(50));
+    private final FileStoreGCMonitor fileStoreGCMonitor = new FileStoreGCMonitor(Clock.SIMPLE);
+    private final TestGCMonitor gcMonitor = new TestGCMonitor(fileStoreGCMonitor);
+    private final Set<ListenableScheduledFuture<?>> writers = newConcurrentHashSet();
+    private final Set<ListenableScheduledFuture<?>> readers = newConcurrentHashSet();
+    private final Set<Reference> references = newConcurrentHashSet();
+    private final SegmentCompactionITMBean segmentCompactionMBean = new SegmentCompactionITMBean();
+    private final CompactionStrategy compactionStrategy = new CompactionStrategy(
+            false, false, CLEAN_OLD, 60000, MEMORY_THRESHOLD_DEFAULT) {
+        @Override
+        public boolean compacted(@Nonnull Callable<Boolean> setHead) throws Exception {
+            return nodeStore.locked(setHead);
+        }
+    };
+
+    private File directory;
+    private FileStore fileStore;
+    private SegmentNodeStore nodeStore;
+    private Registration mBeanRegistration;
+
+    private volatile ListenableFuture<?> compactor = immediateCancelledFuture();
+    private volatile int maxReaders = 10;
+    private volatile int maxWriters = 10;
+    private volatile long maxStoreSize = 200000000000L;
+    private volatile int maxBlobSize = 1000000;
+    private volatile int maxReferences = 10;
+    private volatile int compactionInterval = 1;
+    private volatile boolean stopping;
+    private volatile Reference rootReference;
+
+    public synchronized void stop() {
+        stopping = true;
+        notifyAll();
+    }
+
+    public void addReaders(int count) {
+        for (int c = 0; c < count; c++) {
+            scheduleReader();
+        }
+    }
+
+    public void removeReaders(int count) {
+        remove(readers, count);
+    }
+
+    public void addWriters(int count) {
+        for (int c = 0; c < count; c++) {
+            scheduleWriter();
+        }
+    }
+
+    public void removeWriters(int count) {
+        remove(writers, count);
+    }
+
+    public void removeReferences(int count) {
+        Iterator<Reference> it = references.iterator();
+        while (it.hasNext() && count-- > 0) {
+            it.next().run();
+            it.remove();
+        }
+    }
+
+    private static void remove(Set<ListenableScheduledFuture<?>> futures, int count) {
+        Iterator<ListenableScheduledFuture<?>> it = futures.iterator();
+        while (it.hasNext() && count-- > 0) {
+            it.next().cancel(false);
+        }
+    }
+
+    private Registration registerMBean(Object mBean, final ObjectName objectName)
+            throws NotCompliantMBeanException, InstanceAlreadyExistsException,
+            MBeanRegistrationException {
+        mBeanServer.registerMBean(mBean, objectName);
+        return new Registration(){
+            @Override
+            public void unregister() {
+                try {
+                    mBeanServer.unregisterMBean(objectName);
+                } catch (Exception e) {
+                    LOG.error("Error unregistering Segment Compaction MBean", e);
+                }
+            }
+        };
+    }
+
+    @Before
+    public void setUp() throws IOException, MalformedObjectNameException, NotCompliantMBeanException,
+            InstanceAlreadyExistsException, MBeanRegistrationException {
+        assumeTrue(ENABLED);
+
+        mBeanRegistration = new CompositeRegistration(
+            registerMBean(segmentCompactionMBean, new ObjectName("IT:TYPE=Segment Compaction")),
+            registerMBean(new DefaultCompactionStrategyMBean(compactionStrategy),
+                    new ObjectName("IT:TYPE=Compaction Strategy")),
+            registerMBean(fileStoreGCMonitor, new ObjectName("IT:TYPE=GC Monitor")));
+
+        scheduler.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                fileStoreGCMonitor.run();
+            }
+        }, 1, 1, SECONDS);
+
+        directory = createTempFile(getClass().getSimpleName(), "dir", new File("target"));
+        directory.delete();
+        directory.mkdir();
+
+        fileStore = newFileStore(directory).withGCMonitor(gcMonitor).create();
+        nodeStore = new SegmentNodeStore(fileStore);
+        fileStore.setCompactionStrategy(compactionStrategy);
+    }
+
+    @After
+    public void tearDown() throws InterruptedException {
+        try {
+            if (mBeanRegistration != null) {
+                mBeanRegistration.unregister();
+            }
+            scheduler.shutdown();
+            if (fileStore != null) {
+                fileStore.close();
+            }
+            if (directory != null) {
+                deleteDirectory(directory);
+            }
+        } catch (IOException e) {
+            LOG.error("Error cleaning directory", e);
+        }
+    }
+
+    @Test
+    public void run() throws InterruptedException {
+        scheduleCompactor();
+        addReaders(maxReaders);
+        addWriters(maxWriters);
+
+        synchronized (this) {
+            while (!stopping) {
+                wait();
+            }
+        }
+    }
+
+    private synchronized void scheduleCompactor() {
+        LOG.info("Scheduling compaction after {} minutes", compactionInterval);
+        compactor.cancel(false);
+        compactor = scheduler.schedule((new Compactor(fileStore, gcMonitor)), compactionInterval, MINUTES);
+        addCallback(compactor, new FutureCallback<Object>() {
+            @Override
+            public void onSuccess(Object result) {
+                scheduleCompactor();
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                segmentCompactionMBean.error("Compactor error", t);
+            }
+        });
+    }
+
+    private void scheduleWriter() {
+        if (writers.size() < maxWriters) {
+            final ListenableScheduledFuture<Void> writer = scheduler.schedule(
+                    new RandomWriter(rnd, nodeStore, rnd.nextInt(500), "W" + rnd.nextInt(5)),
+                    rnd.nextInt(30), SECONDS);
+            writers.add(writer);
+            addCallback(writer, new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(Void result) {
+                    writers.remove(writer);
+                    if (!writer.isCancelled()) {
+                        scheduleWriter();
+                    }
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    writers.remove(writer);
+                    segmentCompactionMBean.error("Writer error", t);
+                }
+            });
+        }
+    }
+
+    private void scheduleReader() {
+        if (readers.size() < maxReaders) {
+            final ListenableScheduledFuture<?> reader = rnd.nextBoolean()
+                ? scheduler.schedule(new RandomNodeReader(rnd, nodeStore), rnd.nextInt(30), SECONDS)
+                : scheduler.schedule(new RandomPropertyReader(rnd, nodeStore), rnd.nextInt(30), SECONDS);
+            readers.add(reader);
+            addCallback(reader, new FutureCallback<Object>() {
+                @Override
+                public void onSuccess(Object node) {
+                    readers.remove(reader);
+                    if (!reader.isCancelled()) {
+                        if (rnd.nextBoolean()) {
+                            scheduleReference(node);
+                        } else {
+                            scheduleReader();
+                        }
+                    }
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    readers.remove(reader);
+                    segmentCompactionMBean.error("Node reader error", t);
+                }
+            });
+        }
+    }
+
+    private void scheduleReference(Object object) {
+        if (references.size() < maxReferences) {
+            final Reference reference = new Reference(object);
+            final ListenableScheduledFuture<?> ref = scheduler.schedule(
+                    reference, rnd.nextInt(600), SECONDS);
+            references.add(reference);
+            addCallback(ref, new FutureCallback<Object>() {
+                @Override
+                public void onSuccess(Object result) {
+                    references.remove(reference);
+                    if (!ref.isCancelled()) {
+                        scheduleReader();
+                    }
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    references.remove(reference);
+                    segmentCompactionMBean.error("Reference error", t);
+                }
+            });
+        } else {
+            scheduleReader();
+        }
+    }
+
+    private class RandomWriter implements Callable<Void> {
+        private final Random rnd;
+        private final NodeStore nodeStore;
+        private final int opCount;
+        private final String itemPrefix;
+
+        RandomWriter(Random rnd, NodeStore nodeStore, int opCount, String itemPrefix) {
+            this.rnd = rnd;
+            this.nodeStore = nodeStore;
+            this.opCount = opCount;
+            this.itemPrefix = itemPrefix;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            NodeBuilder root = nodeStore.getRoot().builder();
+            boolean deleteOnly = fileStore.size() > maxStoreSize;
+            for (int k = 0; k < opCount; k++) {
+                modify(nodeStore, root, deleteOnly);
+            }
+            nodeStore.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+            return null;
+        }
+
+        private void modify(NodeStore nodeStore, NodeBuilder nodeBuilder, boolean deleteOnly)
+                throws IOException {
+            int k = rnd.nextInt(100);
+            if (k < 10) {
+                if (!nodeBuilder.remove()) {
+                    descent(nodeStore, nodeBuilder, deleteOnly);
+                }
+            } else if (k < 40 && !deleteOnly)  {
+                nodeBuilder.setChildNode('N' + itemPrefix + rnd.nextInt(1000));
+            } else if (k < 80 && !deleteOnly) {
+                nodeBuilder.setProperty('P' + itemPrefix + rnd.nextInt(1000),
+                        randomAlphabetic(rnd.nextInt(10000)));
+            } else if (k < 90 && !deleteOnly) {
+                nodeBuilder.setProperty('B' + itemPrefix + rnd.nextInt(1000),
+                        createBlob(nodeStore, rnd.nextInt(maxBlobSize)));
+            } else {
+                descent(nodeStore, nodeBuilder, deleteOnly);
+            }
+        }
+
+        private void descent(NodeStore nodeStore, NodeBuilder nodeBuilder, boolean deleteOnly)
+                throws IOException {
+            long count = nodeBuilder.getChildNodeCount(Long.MAX_VALUE);
+            if (count > 0) {
+                int c = rnd.nextInt((int) count);
+                String name = get(nodeBuilder.getChildNodeNames(), c);
+                modify(nodeStore, nodeBuilder.getChildNode(name), deleteOnly);
+            }
+        }
+
+        private Blob createBlob(NodeStore nodeStore, int size) throws IOException {
+            byte[] data = new byte[size];
+            new Random().nextBytes(data);
+            return nodeStore.createBlob(new ByteArrayInputStream(data));
+        }
+    }
+
+    private abstract static class RandomReader<T> implements Callable<T> {
+        protected final Random rnd;
+        protected final NodeStore nodeStore;
+
+        RandomReader(Random rnd, NodeStore nodeStore) {
+            this.rnd = rnd;
+            this.nodeStore = nodeStore;
+        }
+
+        protected final NodeState readRandomTree(NodeState node) {
+            int i = rnd.nextInt(1 + (int) node.getChildNodeCount(Long.MAX_VALUE));
+            if (i != 0) {
+                return readRandomTree(get(node.getChildNodeEntries(), i - 1).getNodeState());
+            } else {
+                return node;
+            }
+        }
+
+        protected final PropertyState readRandomProperty(NodeState node) throws Exception {
+            int count = (int) node.getPropertyCount();
+            if (count > 0) {
+                return get(node.getProperties(), rnd.nextInt(count));
+            } else {
+                return null;
+            }
+        }
+    }
+
+    private static class RandomNodeReader extends RandomReader<NodeState> {
+        RandomNodeReader(Random rnd, NodeStore nodeStore) {
+            super(rnd, nodeStore);
+        }
+
+        @Override
+        public NodeState call() throws Exception {
+            return readRandomTree(nodeStore.getRoot());
+        }
+    }
+
+    private static class RandomPropertyReader extends RandomReader<PropertyState> {
+        RandomPropertyReader(Random rnd, NodeStore nodeStore) {
+            super(rnd, nodeStore);
+        }
+
+        @Override
+        public PropertyState call() throws Exception {
+            return readRandomProperty(readRandomTree(nodeStore.getRoot()));
+        }
+    }
+
+    private static class Reference implements Runnable {
+        private volatile Object referent;
+
+        Reference(Object referent) {
+            this.referent = referent;
+        }
+
+        @Override
+        public void run() {
+            referent = null;
+        }
+    }
+
+    private static class Compactor implements Runnable {
+        private final FileStore fileStore;
+        private final TestGCMonitor gcMonitor;
+
+        Compactor(FileStore fileStore, TestGCMonitor gcMonitor) {
+            this.fileStore = fileStore;
+            this.gcMonitor = gcMonitor;
+        }
+
+        @Override
+        public void run() {
+            if (gcMonitor.isCleaned()) {
+                LOG.info("Running compaction");
+                gcMonitor.resetCleaned();
+                fileStore.maybeCompact(true);
+            } else {
+                LOG.info("Not running compaction as no cleanup has taken place");
+            }
+        }
+    }
+
+    private static class TestGCMonitor implements GCMonitor {
+        private final GCMonitor delegate;
+        private volatile boolean cleaned = true;
+        private volatile long lastCompacted;
+
+        TestGCMonitor(GCMonitor delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public void info(String message, Object... arguments) {
+            System.out.println(arrayFormat(message, arguments).getMessage());
+            delegate.info(message, arguments);
+        }
+
+        @Override
+        public void warn(String message, Object... arguments) {
+            System.out.println(arrayFormat(message, arguments).getMessage());
+            delegate.warn(message, arguments);
+        }
+
+        @Override
+        public void error(String message, Exception exception) {
+            System.out.println(format(message, exception).getMessage());
+            delegate.error(message, exception);
+        }
+
+        @Override
+        public void skipped(String reason, Object... arguments) {
+            cleaned = true;
+            System.out.println(arrayFormat(reason, arguments).getMessage());
+            delegate.skipped(reason, arguments);
+        }
+
+        @Override
+        public void compacted() {
+            delegate.compacted();
+            lastCompacted = System.currentTimeMillis();
+        }
+
+        @Override
+        public void cleaned(long reclaimedSize, long currentSize) {
+            cleaned = true;
+            delegate.cleaned(reclaimedSize, currentSize);
+        }
+
+        public boolean isCleaned() {
+            return cleaned;
+        }
+
+        public void resetCleaned() {
+            cleaned = false;
+        }
+
+        public long getLastCompacted() {
+            return lastCompacted;
+        }
+    }
+
+    private class SegmentCompactionITMBean extends AnnotatedStandardMBean implements SegmentCompactionMBean {
+        private String lastError;
+
+        SegmentCompactionITMBean() {
+            super(SegmentCompactionMBean.class);
+        }
+
+        @Override
+        public void stop() {
+            SegmentCompactionIT.this.stop();
+        }
+
+        @Override
+        public void setCompactionInterval(int minutes) {
+            if (compactionInterval != minutes) {
+                compactionInterval = minutes;
+                scheduleCompactor();
+            }
+        }
+
+        @Override
+        public int getCompactionInterval() {
+            return compactionInterval;
+        }
+
+        @Override
+        public String getLastCompaction() {
+            return valueOf(new Date(gcMonitor.getLastCompacted()));
+        }
+
+        @Override
+        public void setMaxReaders(int count) {
+            checkArgument(count >= 0);
+            maxReaders = count;
+            if (count > readers.size()) {
+                addReaders(count - readers.size());
+            } else {
+                removeReaders(readers.size() - count);
+            }
+        }
+
+        @Override
+        public int getMaxReaders() {
+            return maxReaders;
+        }
+
+        @Override
+        public void setMaxWriters(int count) {
+            checkArgument(count >= 0);
+            maxWriters = count;
+            if (count > writers.size()) {
+                addWriters(count - writers.size());
+            } else {
+                removeWriters(writers.size() - count);
+            }
+        }
+
+        @Override
+        public int getMaxWriters() {
+            return maxWriters;
+        }
+
+        @Override
+        public void setMaxStoreSize(long size) {
+            maxStoreSize = size;
+        }
+
+        @Override
+        public long getMaxStoreSize() {
+            return maxStoreSize;
+        }
+
+        @Override
+        public void setMaxBlobSize(int size) {
+            maxBlobSize = size;
+        }
+
+        @Override
+        public int getMaxBlobSize() {
+            return maxBlobSize;
+        }
+
+        @Override
+        public void setMaxReferences(int count) {
+            checkArgument(count >= 0);
+            maxReferences = count;
+            if (count < references.size()) {
+                removeReferences(references.size() - count);
+            }
+        }
+
+        @Override
+        public int getMaxReferences() {
+            return maxReferences;
+        }
+
+        @Override
+        public void setRootReference(boolean set) {
+            if (set && rootReference == null) {
+                rootReference = new Reference(nodeStore.getRoot());
+            } else {
+                rootReference = null;
+            }
+        }
+
+        @Override
+        public boolean getRootReference() {
+            return rootReference != null;
+        }
+
+        @Override
+        public int getReaderCount() {
+            return readers.size();
+        }
+
+        @Override
+        public int getWriterCount() {
+            return writers.size();
+        }
+
+        @Override
+        public int getReferenceCount() {
+            return references.size();
+        }
+
+        @Override
+        public long getFileStoreSize() {
+            try {
+                return fileStore.size();
+            } catch (IOException e) {
+                error("Error getting size of file store", e);
+                return -1;
+            }
+        }
+
+        @Override
+        public long getCompactionMapWeight() {
+            return fileStore.getTracker().getCompactionMap().getEstimatedWeight();
+        }
+
+        @Override
+        public int getCompactionMapDepth() {
+            return fileStore.getTracker().getCompactionMap().getDepth();
+        }
+
+        @Override
+        public String getLastError() {
+            return lastError;
+        }
+
+        void error(String message, Throwable t) {
+            if (!(t instanceof CancellationException)) {
+                StringWriter sw = new StringWriter();
+                sw.write(message + ": ");
+                t.printStackTrace(new PrintWriter(sw));
+                lastError = sw.toString();
+
+                LOG.error(message, t);
+            }
+        }
+    }
+}

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionMBean.java?rev=1673787&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionMBean.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionMBean.java Wed Apr 15 14:41:02 2015
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment;
+
+/**
+ * MBean for monitoring and interacting with the {@link SegmentCompactionIT}
+ * longevity test.
+ */
+public interface SegmentCompactionMBean {
+
+    /**
+     * Stop the test.
+     */
+    void stop();
+
+    /**
+     * Set the compaction interval
+     * @param minutes  number of minutes to wait between compaction cycles.
+     */
+    void setCompactionInterval(int minutes);
+
+    /**
+     * @return  the compaction interval in minutes.
+     */
+    int getCompactionInterval();
+
+    /**
+     * @return  Time stamp from when compaction last ran.
+     */
+    String getLastCompaction();
+
+    /**
+     * Set the maximal number of concurrent readers
+     * @param count
+     */
+    void setMaxReaders(int count);
+
+    /**
+     * @return  maximal number of concurrent readers
+     */
+    int getMaxReaders();
+
+    /**
+     * Set the maximal number of concurrent writers
+     * @param count
+     */
+    void setMaxWriters(int count);
+
+    /**
+     * @return  maximal number of concurrent writers
+     */
+    int getMaxWriters();
+
+    /**
+     * Set the maximal size of the store
+     * @param size  size in bytes
+     */
+    void setMaxStoreSize(long size);
+
+    /**
+     * @return  maximal size of the store in bytes
+     */
+    long getMaxStoreSize();
+
+    /**
+     * Set the maximal size of binary properties
+     * @param size  size in bytes
+     */
+    void setMaxBlobSize(int size);
+
+    /**
+     * @return  maximal size of binary properties in bytes
+     */
+    int getMaxBlobSize();
+
+    /**
+     * Set the maximal number of held references
+     * @param count  maximal number of references
+     */
+    void setMaxReferences(int count);
+
+    /**
+     * @return  maximal number of held references
+     */
+    int getMaxReferences();
+
+    /**
+     * Add a reference to the current root or release a held reference.
+     * @param set  add a reference if {@code true}, otherwise release any held reference
+     */
+    void setRootReference(boolean set);
+
+    /**
+     * @return  {@code true} if currently a root reference is being held. {@code false} otherwise.
+     */
+    boolean getRootReference();
+
+    /**
+     * @return  actual number of concurrent readers
+     */
+    int getReaderCount();
+
+    /**
+     * @return  actual number of concurrent writers
+     */
+    int getWriterCount();
+
+    /**
+     * @return actual number of held references (not including any root reference)
+     */
+    int getReferenceCount();
+
+    /**
+     * @return  current size of the {@link org.apache.jackrabbit.oak.plugins.segment.file.FileStore}
+     */
+    long getFileStoreSize();
+
+    /**
+     * @return  current weight of the compaction map
+     */
+    long getCompactionMapWeight();
+
+    /**
+     * @return  current depth of the compaction map
+     */
+    int getCompactionMapDepth();
+
+    /**
+     * @return  last error
+     */
+    String getLastError();
+}