You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2018/09/20 11:59:03 UTC

svn commit: r1841460 - in /jackrabbit/oak/trunk/oak-segment-tar/src: main/java/org/apache/jackrabbit/oak/segment/ main/java/org/apache/jackrabbit/oak/segment/file/ test/java/org/apache/jackrabbit/oak/segment/

Author: mduerig
Date: Thu Sep 20 11:59:03 2018
New Revision: 1841460

URL: http://svn.apache.org/viewvc?rev=1841460&view=rev
Log:
OAK-7773: Implement monitoring for allocated byte buffers
Initial implementation

Added:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitor.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitorTest.java
Modified:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitor.java?rev=1841460&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitor.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitor.java Thu Sep 20 11:59:03 2018
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import static com.google.common.collect.Sets.newConcurrentHashSet;
+import static org.apache.jackrabbit.oak.stats.StatsOptions.METRICS_ONLY;
+
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.apache.jackrabbit.oak.stats.CounterStats;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * This class exposes {@link CounterStats} for allocations and de-allocations
+ * of {@link ByteBuffer} instances:
+ * <ul>
+ *     <li>{@link #DIRECT_BUFFER_COUNT}: number of allocated direct byte
+ *          buffers.</li>
+ *     <li>{@link #DIRECT_BUFFER_CAPACITY}: total capacity of the allocated
+ *          direct byte buffers.</li>
+ *     <li>{@link #HEAP_BUFFER_COUNT}: number of allocated heap byte
+ *          buffers.</li>
+ *     <li>{@link #HEAP_BUFFER_CAPACITY}: total capacity of the allocated
+ *          heap byte buffers.</li>
+ * </ul>
+ * <p>
+ * Users of this class call {@link #trackAllocation(ByteBuffer)} to update above statistics.
+ */
+public class SegmentBufferMonitor {
+
+    /**
+     * Number of allocated direct byte buffers
+     */
+    public static final String DIRECT_BUFFER_COUNT = "oak.segment.direct-buffer-count";
+
+    /**
+     * Total capacity of the allocated direct byte buffers.
+     */
+    public static final String DIRECT_BUFFER_CAPACITY = "oak.segment.direct-buffer-capacity";
+
+    /**
+     * Number of allocated heap byte buffers
+     */
+    public static final String HEAP_BUFFER_COUNT = "oak.segment.heap-buffer-count";
+
+    /**
+     * Total capacity of the allocated heap byte buffers.
+     */
+    public static final String HEAP_BUFFER_CAPACITY = "oak.segment.heap-buffer-capacity";
+
+    @NotNull
+    private final Set<BufferReference> buffers = newConcurrentHashSet();
+
+    @NotNull
+    private final ReferenceQueue<ByteBuffer> referenceQueue = new ReferenceQueue<>();
+
+    @NotNull
+    private final CounterStats directBufferCount;
+
+    @NotNull
+    private final CounterStats directBufferCapacity;
+
+    @NotNull
+    private final CounterStats heapBufferCount;
+
+    @NotNull
+    private final CounterStats heapBufferCapacity;
+
+    /**
+     * Create a new instance using the passed {@code statisticsProvider} to expose
+     * buffer allocations.
+     * @param statisticsProvider
+     */
+    public SegmentBufferMonitor(@NotNull StatisticsProvider statisticsProvider) {
+        directBufferCount = statisticsProvider.getCounterStats(DIRECT_BUFFER_COUNT, METRICS_ONLY);
+        directBufferCapacity = statisticsProvider.getCounterStats(DIRECT_BUFFER_CAPACITY, METRICS_ONLY);
+        heapBufferCount = statisticsProvider.getCounterStats(HEAP_BUFFER_COUNT, METRICS_ONLY);
+        heapBufferCapacity = statisticsProvider.getCounterStats(HEAP_BUFFER_CAPACITY, METRICS_ONLY);
+    }
+
+    private static class BufferReference extends WeakReference<ByteBuffer> {
+        private final int capacity;
+        private final boolean isDirect;
+
+        public BufferReference(@NotNull ByteBuffer buffer,
+                               @NotNull ReferenceQueue<ByteBuffer> queue) {
+            super(buffer, queue);
+            this.capacity = buffer.capacity();
+            this.isDirect = buffer.isDirect();
+        }
+    }
+
+    /**
+     * Track the allocation of a {@code buffer} and update the exposed statistics.
+     * @param buffer
+     */
+    public void trackAllocation(@NotNull ByteBuffer buffer) {
+        BufferReference reference = new BufferReference(buffer, referenceQueue);
+        buffers.add(reference);
+        allocated(reference);
+        trackDeallocations();
+    }
+
+    private void trackDeallocations() {
+        BufferReference reference = (BufferReference) referenceQueue.poll();
+        while (reference != null) {
+            buffers.remove(reference);
+            deallocated(reference);
+            reference = (BufferReference) referenceQueue.poll();
+        }
+    }
+
+    private void allocated(@NotNull BufferReference reference) {
+        if (reference.isDirect) {
+            directBufferCount.inc();
+            directBufferCapacity.inc(reference.capacity);
+        } else {
+            heapBufferCount.inc();
+            heapBufferCapacity.inc(reference.capacity);
+        }
+    }
+
+    private void deallocated(@NotNull BufferReference reference) {
+        if (reference.isDirect) {
+            directBufferCount.dec();
+            directBufferCapacity.dec(reference.capacity);
+        } else {
+            heapBufferCount.dec();
+            heapBufferCapacity.dec(reference.capacity);
+        }
+    }
+
+}

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java?rev=1841460&r1=1841459&r2=1841460&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java Thu Sep 20 11:59:03 2018
@@ -42,6 +42,7 @@ import org.apache.jackrabbit.oak.segment
 import org.apache.jackrabbit.oak.segment.SegmentIdFactory;
 import org.apache.jackrabbit.oak.segment.SegmentIdProvider;
 import org.apache.jackrabbit.oak.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.segment.SegmentBufferMonitor;
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
 import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
 import org.apache.jackrabbit.oak.segment.SegmentReader;
@@ -119,6 +120,9 @@ public abstract class AbstractFileStore
 
     };
 
+    @NotNull
+    private final SegmentBufferMonitor segmentBufferMonitor;
+
     protected final IOMonitor ioMonitor;
 
     AbstractFileStore(final FileStoreBuilder builder) {
@@ -134,6 +138,7 @@ public abstract class AbstractFileStore
         this.segmentReader = new CachingSegmentReader(this::getWriter, blobStore, builder.getStringCacheSize(), builder.getTemplateCacheSize());
         this.memoryMapping = builder.getMemoryMapping();
         this.ioMonitor = builder.getIOMonitor();
+        this.segmentBufferMonitor = new SegmentBufferMonitor(builder.getStatsProvider());
     }
 
     static SegmentNotFoundException asSegmentNotFoundException(Exception e, SegmentId id) {
@@ -268,6 +273,7 @@ public abstract class AbstractFileStore
         if (buffer == null) {
             throw new SegmentNotFoundException(id);
         }
+        segmentBufferMonitor.trackAllocation(buffer);
         return new Segment(tracker, segmentReader, id, buffer);
     }
 

Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitorTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitorTest.java?rev=1841460&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitorTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitorTest.java Thu Sep 20 11:59:03 2018
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import static com.google.common.collect.Maps.newHashMap;
+import static org.apache.jackrabbit.oak.segment.SegmentBufferMonitor.DIRECT_BUFFER_CAPACITY;
+import static org.apache.jackrabbit.oak.segment.SegmentBufferMonitor.DIRECT_BUFFER_COUNT;
+import static org.apache.jackrabbit.oak.segment.SegmentBufferMonitor.HEAP_BUFFER_CAPACITY;
+import static org.apache.jackrabbit.oak.segment.SegmentBufferMonitor.HEAP_BUFFER_COUNT;
+import static org.apache.jackrabbit.oak.stats.SimpleStats.Type.COUNTER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.jackrabbit.api.stats.RepositoryStatistics;
+import org.apache.jackrabbit.oak.stats.CounterStats;
+import org.apache.jackrabbit.oak.stats.HistogramStats;
+import org.apache.jackrabbit.oak.stats.MeterStats;
+import org.apache.jackrabbit.oak.stats.SimpleStats;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatsOptions;
+import org.apache.jackrabbit.oak.stats.TimerStats;
+import org.junit.Test;
+
+public class SegmentBufferMonitorTest {
+
+    private final Map<String, CounterStats> stats = newHashMap();
+
+    private final SegmentBufferMonitor segmentBufferMonitor = new SegmentBufferMonitor(new StatisticsProvider() {
+        @Override
+        public RepositoryStatistics getStats() {
+            throw new IllegalStateException();
+        }
+
+        @Override
+        public MeterStats getMeter(String name, StatsOptions options) {
+            throw new IllegalStateException();
+        }
+
+        @Override
+        public CounterStats getCounterStats(String name, StatsOptions options) {
+            SimpleStats simpleStats = new SimpleStats(new AtomicLong(), COUNTER);
+            stats.put(name, simpleStats);
+            return simpleStats;
+        }
+
+        @Override
+        public TimerStats getTimer(String name, StatsOptions options) {
+            throw new IllegalStateException();
+        }
+
+        @Override
+        public HistogramStats getHistogram(String name, StatsOptions options) {
+            throw new IllegalStateException();
+        }
+    });
+
+    @Test
+    public void emptyStats() {
+        assertEquals(0, stats.get(DIRECT_BUFFER_COUNT).getCount());
+        assertEquals(0, stats.get(DIRECT_BUFFER_CAPACITY).getCount());
+        assertEquals(0, stats.get(HEAP_BUFFER_COUNT).getCount());
+        assertEquals(0, stats.get(HEAP_BUFFER_CAPACITY).getCount());
+    }
+
+    @Test
+    public void heapBuffer() {
+        ByteBuffer buffer = ByteBuffer.allocate(42);
+        segmentBufferMonitor.trackAllocation(buffer);
+
+        assertEquals(0, stats.get(DIRECT_BUFFER_COUNT).getCount());
+        assertEquals(0, stats.get(DIRECT_BUFFER_CAPACITY).getCount());
+        assertEquals(1, stats.get(HEAP_BUFFER_COUNT).getCount());
+        assertEquals(42, stats.get(HEAP_BUFFER_CAPACITY).getCount());
+
+        buffer = null;
+        System.gc();
+
+        assertEquals(0, stats.get(DIRECT_BUFFER_COUNT).getCount());
+        assertEquals(0, stats.get(DIRECT_BUFFER_CAPACITY).getCount());
+        assertTrue(stats.get(HEAP_BUFFER_COUNT).getCount() <= 1);
+        assertTrue(stats.get(HEAP_BUFFER_CAPACITY).getCount() <= 42);
+    }
+
+    @Test
+    public void directBuffer() {
+        ByteBuffer buffer = ByteBuffer.allocateDirect(42);
+        segmentBufferMonitor.trackAllocation(buffer);
+
+        assertEquals(1, stats.get(DIRECT_BUFFER_COUNT).getCount());
+        assertEquals(42, stats.get(DIRECT_BUFFER_CAPACITY).getCount());
+        assertEquals(0, stats.get(HEAP_BUFFER_COUNT).getCount());
+        assertEquals(0, stats.get(HEAP_BUFFER_CAPACITY).getCount());
+
+        buffer = null;
+        System.gc();
+
+        assertTrue(stats.get(DIRECT_BUFFER_COUNT).getCount() <= 1);
+        assertTrue(stats.get(DIRECT_BUFFER_CAPACITY).getCount() <= 42);
+        assertEquals(0, stats.get(HEAP_BUFFER_COUNT).getCount());
+        assertEquals(0, stats.get(HEAP_BUFFER_CAPACITY).getCount());
+    }
+}