You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2018/09/20 11:59:03 UTC
svn commit: r1841460 - in /jackrabbit/oak/trunk/oak-segment-tar/src:
main/java/org/apache/jackrabbit/oak/segment/
main/java/org/apache/jackrabbit/oak/segment/file/
test/java/org/apache/jackrabbit/oak/segment/
Author: mduerig
Date: Thu Sep 20 11:59:03 2018
New Revision: 1841460
URL: http://svn.apache.org/viewvc?rev=1841460&view=rev
Log:
OAK-7773: Implement monitoring for allocated byte buffers
Initial implementation
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitor.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitorTest.java
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitor.java?rev=1841460&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitor.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitor.java Thu Sep 20 11:59:03 2018
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import static com.google.common.collect.Sets.newConcurrentHashSet;
+import static org.apache.jackrabbit.oak.stats.StatsOptions.METRICS_ONLY;
+
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.apache.jackrabbit.oak.stats.CounterStats;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * This class exposes {@link CounterStats} for allocations and de-allocations
+ * of {@link ByteBuffer} instances:
+ * <ul>
+ * <li>{@link #DIRECT_BUFFER_COUNT}: number of allocated direct byte
+ * buffers.</li>
+ * <li>{@link #DIRECT_BUFFER_CAPACITY}: total capacity of the allocated
+ * direct byte buffers.</li>
+ * <li>{@link #HEAP_BUFFER_COUNT}: number of allocated heap byte
+ * buffers.</li>
+ * <li>{@link #HEAP_BUFFER_CAPACITY}: total capacity of the allocated
+ * heap byte buffers.</li>
+ * </ul>
+ * <p>
+ * Users of this class call {@link #trackAllocation(ByteBuffer)} to update above statistics.
+ */
+public class SegmentBufferMonitor {
+
+ /**
+ * Number of allocated direct byte buffers
+ */
+ public static final String DIRECT_BUFFER_COUNT = "oak.segment.direct-buffer-count";
+
+ /**
+ * Total capacity of the allocated direct byte buffers.
+ */
+ public static final String DIRECT_BUFFER_CAPACITY = "oak.segment.direct-buffer-capacity";
+
+ /**
+ * Number of allocated heap byte buffers
+ */
+ public static final String HEAP_BUFFER_COUNT = "oak.segment.heap-buffer-count";
+
+ /**
+ * Total capacity of the allocated heap byte buffers.
+ */
+ public static final String HEAP_BUFFER_CAPACITY = "oak.segment.heap-buffer-capacity";
+
+ @NotNull
+ private final Set<BufferReference> buffers = newConcurrentHashSet();
+
+ @NotNull
+ private final ReferenceQueue<ByteBuffer> referenceQueue = new ReferenceQueue<>();
+
+ @NotNull
+ private final CounterStats directBufferCount;
+
+ @NotNull
+ private final CounterStats directBufferCapacity;
+
+ @NotNull
+ private final CounterStats heapBufferCount;
+
+ @NotNull
+ private final CounterStats heapBufferCapacity;
+
+ /**
+ * Create a new instance using the passed {@code statisticsProvider} to expose
+ * buffer allocations.
+ * @param statisticsProvider
+ */
+ public SegmentBufferMonitor(@NotNull StatisticsProvider statisticsProvider) {
+ directBufferCount = statisticsProvider.getCounterStats(DIRECT_BUFFER_COUNT, METRICS_ONLY);
+ directBufferCapacity = statisticsProvider.getCounterStats(DIRECT_BUFFER_CAPACITY, METRICS_ONLY);
+ heapBufferCount = statisticsProvider.getCounterStats(HEAP_BUFFER_COUNT, METRICS_ONLY);
+ heapBufferCapacity = statisticsProvider.getCounterStats(HEAP_BUFFER_CAPACITY, METRICS_ONLY);
+ }
+
+ private static class BufferReference extends WeakReference<ByteBuffer> {
+ private final int capacity;
+ private final boolean isDirect;
+
+ public BufferReference(@NotNull ByteBuffer buffer,
+ @NotNull ReferenceQueue<ByteBuffer> queue) {
+ super(buffer, queue);
+ this.capacity = buffer.capacity();
+ this.isDirect = buffer.isDirect();
+ }
+ }
+
+ /**
+ * Track the allocation of a {@code buffer} and update the exposed statistics.
+ * @param buffer
+ */
+ public void trackAllocation(@NotNull ByteBuffer buffer) {
+ BufferReference reference = new BufferReference(buffer, referenceQueue);
+ buffers.add(reference);
+ allocated(reference);
+ trackDeallocations();
+ }
+
+ private void trackDeallocations() {
+ BufferReference reference = (BufferReference) referenceQueue.poll();
+ while (reference != null) {
+ buffers.remove(reference);
+ deallocated(reference);
+ reference = (BufferReference) referenceQueue.poll();
+ }
+ }
+
+ private void allocated(@NotNull BufferReference reference) {
+ if (reference.isDirect) {
+ directBufferCount.inc();
+ directBufferCapacity.inc(reference.capacity);
+ } else {
+ heapBufferCount.inc();
+ heapBufferCapacity.inc(reference.capacity);
+ }
+ }
+
+ private void deallocated(@NotNull BufferReference reference) {
+ if (reference.isDirect) {
+ directBufferCount.dec();
+ directBufferCapacity.dec(reference.capacity);
+ } else {
+ heapBufferCount.dec();
+ heapBufferCapacity.dec(reference.capacity);
+ }
+ }
+
+}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java?rev=1841460&r1=1841459&r2=1841460&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java Thu Sep 20 11:59:03 2018
@@ -42,6 +42,7 @@ import org.apache.jackrabbit.oak.segment
import org.apache.jackrabbit.oak.segment.SegmentIdFactory;
import org.apache.jackrabbit.oak.segment.SegmentIdProvider;
import org.apache.jackrabbit.oak.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.segment.SegmentBufferMonitor;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
import org.apache.jackrabbit.oak.segment.SegmentReader;
@@ -119,6 +120,9 @@ public abstract class AbstractFileStore
};
+ @NotNull
+ private final SegmentBufferMonitor segmentBufferMonitor;
+
protected final IOMonitor ioMonitor;
AbstractFileStore(final FileStoreBuilder builder) {
@@ -134,6 +138,7 @@ public abstract class AbstractFileStore
this.segmentReader = new CachingSegmentReader(this::getWriter, blobStore, builder.getStringCacheSize(), builder.getTemplateCacheSize());
this.memoryMapping = builder.getMemoryMapping();
this.ioMonitor = builder.getIOMonitor();
+ this.segmentBufferMonitor = new SegmentBufferMonitor(builder.getStatsProvider());
}
static SegmentNotFoundException asSegmentNotFoundException(Exception e, SegmentId id) {
@@ -268,6 +273,7 @@ public abstract class AbstractFileStore
if (buffer == null) {
throw new SegmentNotFoundException(id);
}
+ segmentBufferMonitor.trackAllocation(buffer);
return new Segment(tracker, segmentReader, id, buffer);
}
Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitorTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitorTest.java?rev=1841460&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitorTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitorTest.java Thu Sep 20 11:59:03 2018
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import static com.google.common.collect.Maps.newHashMap;
+import static org.apache.jackrabbit.oak.segment.SegmentBufferMonitor.DIRECT_BUFFER_CAPACITY;
+import static org.apache.jackrabbit.oak.segment.SegmentBufferMonitor.DIRECT_BUFFER_COUNT;
+import static org.apache.jackrabbit.oak.segment.SegmentBufferMonitor.HEAP_BUFFER_CAPACITY;
+import static org.apache.jackrabbit.oak.segment.SegmentBufferMonitor.HEAP_BUFFER_COUNT;
+import static org.apache.jackrabbit.oak.stats.SimpleStats.Type.COUNTER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.jackrabbit.api.stats.RepositoryStatistics;
+import org.apache.jackrabbit.oak.stats.CounterStats;
+import org.apache.jackrabbit.oak.stats.HistogramStats;
+import org.apache.jackrabbit.oak.stats.MeterStats;
+import org.apache.jackrabbit.oak.stats.SimpleStats;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatsOptions;
+import org.apache.jackrabbit.oak.stats.TimerStats;
+import org.junit.Test;
+
+public class SegmentBufferMonitorTest {
+
+ private final Map<String, CounterStats> stats = newHashMap();
+
+ private final SegmentBufferMonitor segmentBufferMonitor = new SegmentBufferMonitor(new StatisticsProvider() {
+ @Override
+ public RepositoryStatistics getStats() {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public MeterStats getMeter(String name, StatsOptions options) {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public CounterStats getCounterStats(String name, StatsOptions options) {
+ SimpleStats simpleStats = new SimpleStats(new AtomicLong(), COUNTER);
+ stats.put(name, simpleStats);
+ return simpleStats;
+ }
+
+ @Override
+ public TimerStats getTimer(String name, StatsOptions options) {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public HistogramStats getHistogram(String name, StatsOptions options) {
+ throw new IllegalStateException();
+ }
+ });
+
+ @Test
+ public void emptyStats() {
+ assertEquals(0, stats.get(DIRECT_BUFFER_COUNT).getCount());
+ assertEquals(0, stats.get(DIRECT_BUFFER_CAPACITY).getCount());
+ assertEquals(0, stats.get(HEAP_BUFFER_COUNT).getCount());
+ assertEquals(0, stats.get(HEAP_BUFFER_CAPACITY).getCount());
+ }
+
+ @Test
+ public void heapBuffer() {
+ ByteBuffer buffer = ByteBuffer.allocate(42);
+ segmentBufferMonitor.trackAllocation(buffer);
+
+ assertEquals(0, stats.get(DIRECT_BUFFER_COUNT).getCount());
+ assertEquals(0, stats.get(DIRECT_BUFFER_CAPACITY).getCount());
+ assertEquals(1, stats.get(HEAP_BUFFER_COUNT).getCount());
+ assertEquals(42, stats.get(HEAP_BUFFER_CAPACITY).getCount());
+
+ buffer = null;
+ System.gc();
+
+ assertEquals(0, stats.get(DIRECT_BUFFER_COUNT).getCount());
+ assertEquals(0, stats.get(DIRECT_BUFFER_CAPACITY).getCount());
+ assertTrue(stats.get(HEAP_BUFFER_COUNT).getCount() <= 1);
+ assertTrue(stats.get(HEAP_BUFFER_CAPACITY).getCount() <= 42);
+ }
+
+ @Test
+ public void directBuffer() {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(42);
+ segmentBufferMonitor.trackAllocation(buffer);
+
+ assertEquals(1, stats.get(DIRECT_BUFFER_COUNT).getCount());
+ assertEquals(42, stats.get(DIRECT_BUFFER_CAPACITY).getCount());
+ assertEquals(0, stats.get(HEAP_BUFFER_COUNT).getCount());
+ assertEquals(0, stats.get(HEAP_BUFFER_CAPACITY).getCount());
+
+ buffer = null;
+ System.gc();
+
+ assertTrue(stats.get(DIRECT_BUFFER_COUNT).getCount() <= 1);
+ assertTrue(stats.get(DIRECT_BUFFER_CAPACITY).getCount() <= 42);
+ assertEquals(0, stats.get(HEAP_BUFFER_COUNT).getCount());
+ assertEquals(0, stats.get(HEAP_BUFFER_CAPACITY).getCount());
+ }
+}