You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by jb...@apache.org on 2020/09/24 11:42:24 UTC

[karaf] branch karaf-4.2.x updated: rewrite the CicularBuffer to reduce logservice overhead

This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch karaf-4.2.x
in repository https://gitbox.apache.org/repos/asf/karaf.git


The following commit(s) were added to refs/heads/karaf-4.2.x by this push:
     new c4a8214  rewrite the CicularBuffer to reduce logservice overhead
c4a8214 is described below

commit c4a8214db373581ddeae011d285e05edd718f9b6
Author: Romain Manni-Bucau <rm...@gmail.com>
AuthorDate: Mon Sep 21 12:44:22 2020 +0200

    rewrite the CicularBuffer to reduce logservice overhead
    
    (cherry picked from commit 7021e024bc22ba32379cc667ab839422e9d58766)
---
 .../karaf/log/core/internal/CircularBuffer.java    | 112 +++++++++------------
 .../karaf/log/core/internal/LogServiceImpl.java    |  13 +--
 .../log/core/internal/CircularBufferTest.java      |  55 ++++++++++
 3 files changed, 109 insertions(+), 71 deletions(-)

diff --git a/log/src/main/java/org/apache/karaf/log/core/internal/CircularBuffer.java b/log/src/main/java/org/apache/karaf/log/core/internal/CircularBuffer.java
index c953b31..4b9da9f 100644
--- a/log/src/main/java/org/apache/karaf/log/core/internal/CircularBuffer.java
+++ b/log/src/main/java/org/apache/karaf/log/core/internal/CircularBuffer.java
@@ -18,92 +18,74 @@
  */
 package org.apache.karaf.log.core.internal;
 
-import java.lang.reflect.Array;
-import java.util.ArrayList;
+import org.ops4j.pax.logging.spi.PaxLoggingEvent;
+
 import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.function.IntFunction;
+import java.util.stream.IntStream;
+
+import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.toList;
 
 
 /**
- * An array that only keeps the last N elements added
+ * An array that only keeps the last N elements added.
+ * <p>
+ * It is likely that it writes way more than it reads (add vs getElements) since logs should be continuous appended
+ * but their query should be quite rare so we want to optimize the append path.
+ * <p>
+ * Important: it can happen a small inconsistency between add() and getElements() but the fact getElements()
+ * sorts the data makes it hurtless and it avoids to have a lock in this buffer which must keep a "0-overhead"
+ * on the runtime.
  */
-public class CircularBuffer<T> {
+public class CircularBuffer {
 
-    private T[] elements;
-    private transient int start;
-    private transient int end;
-    private transient boolean full;
-    private final int maxElements;
-    private Class<?> type;
+    private final AtomicInteger currentIdx = new AtomicInteger(0);
+    private final AtomicReferenceArray<PaxLoggingEvent> buffer;
 
-    public CircularBuffer(int size, Class<?> type) {
+    public CircularBuffer(int size) {
         if (size <= 0) {
             throw new IllegalArgumentException("The size must be greater than 0");
         }
-        this.type = type;
-        maxElements = size;
-        clear();
+        this.buffer = new AtomicReferenceArray<>(size);
     }
 
-    private int size() {
-        if (end == start) {
-            return full ? maxElements : 0;
-        } else if (end < start) {
-            return maxElements - start + end;
-        } else {
-            return end - start;
-        }
+    public int maxSize() {
+        return buffer.length();
     }
 
-    @SuppressWarnings("unchecked")
-    public synchronized void clear() {
-        start = 0;
-        end = 0;
-        full = false;
-        elements = (T[])Array.newInstance(type, maxElements);
-    }
-
-    public synchronized void add(T element) {
+    public void add(final PaxLoggingEvent element) {
         if (null == element) {
-             throw new NullPointerException("Attempted to add null object to buffer");
-        }
-        if (full) {
-            increaseStart();
-        }
-        elements[end] = element;
-        increaseEnd();
-        
-    }
-
-    private void increaseStart() {
-        start++;
-        if (start >= maxElements) {
-            start = 0;
+            throw new NullPointerException("Attempted to add null object to buffer");
         }
+        doAdd(element);
     }
 
-    private void increaseEnd() {
-        end++;
-        if (end >= maxElements) {
-            end = 0;
-        }
-        if (end == start) {
-            full = true;
-        }
+    public List<PaxLoggingEvent> getElements(final int requestedCount) {
+        final int max = Math.min(buffer.length(), requestedCount);
+        final int current = currentIdx.get() % buffer.length();
+        return collectEvents(max, idx -> buffer.get((current + idx) % buffer.length()));
     }
 
-    public synchronized Iterable<T> getElements() {
-        return getElements(size());
+    private List<PaxLoggingEvent> collectEvents(final int max, final IntFunction<PaxLoggingEvent> mapper) {
+        return IntStream.range(0, max)
+                .mapToObj(mapper)
+                .filter(Objects::nonNull) // not initialized yet
+                .sorted(comparing(PaxLoggingEvent::getTimeStamp)) // not critical but better when dumped
+                .collect(toList());
     }
 
-    public synchronized Iterable<T> getElements(int nb) {
-        int s = size();
-        nb = Math.min(Math.max(0, nb), s);
-        List<T> result = new ArrayList<T>();
-        for (int i = 0; i < nb; i++) {
-            result.add(elements[(i + s - nb + start) % maxElements]);
-        }
-        return result;
+    private void doAdd(final PaxLoggingEvent element) {
+        final int idx = currentIdx.getAndUpdate(value -> {
+            final int newValue = value + 1;
+            if (newValue >= buffer.length()) {
+                return 0;
+            }
+            return newValue;
+        }) % buffer.length();
+        buffer.set(idx, element);
     }
-
-
 }
diff --git a/log/src/main/java/org/apache/karaf/log/core/internal/LogServiceImpl.java b/log/src/main/java/org/apache/karaf/log/core/internal/LogServiceImpl.java
index d0c52fb..2fdfa32 100644
--- a/log/src/main/java/org/apache/karaf/log/core/internal/LogServiceImpl.java
+++ b/log/src/main/java/org/apache/karaf/log/core/internal/LogServiceImpl.java
@@ -34,14 +34,14 @@ public class LogServiceImpl implements LogService, PaxAppender {
     static final String CONFIGURATION_PID = "org.ops4j.pax.logging";
 
     private final ConfigurationAdmin configAdmin;
-    private final CircularBuffer<PaxLoggingEvent> buffer;
+    private volatile CircularBuffer buffer;
     private List<PaxAppender> appenders;
     
 
     public LogServiceImpl(ConfigurationAdmin configAdmin, int size) {
         this.configAdmin = configAdmin;
         this.appenders = new CopyOnWriteArrayList<>();
-        this.buffer = new CircularBuffer<>(size, PaxLoggingEvent.class);
+        this.buffer = new CircularBuffer(size);
     }
 
     private LogServiceInternal getDelegate(Dictionary<String, Object> config) {
@@ -126,7 +126,7 @@ public class LogServiceImpl implements LogService, PaxAppender {
 
     @Override
     public Iterable<PaxLoggingEvent> getEvents() {
-        return buffer.getElements();
+        return buffer.getElements(buffer.maxSize());
     }
 
     @Override
@@ -135,8 +135,9 @@ public class LogServiceImpl implements LogService, PaxAppender {
     }
 
     @Override
-    public void clearEvents() {
-        buffer.clear();
+    public void clearEvents() { // just reset the buffer, reduce the number of "write locked" operations in the buffer
+        final int size = this.buffer.maxSize();
+        this.buffer = new CircularBuffer(size);
     }
     
     @Override
@@ -172,7 +173,7 @@ public class LogServiceImpl implements LogService, PaxAppender {
     }
 
     @Override
-    public synchronized void doAppend(PaxLoggingEvent event) {
+    public void doAppend(PaxLoggingEvent event) {
         event.getProperties(); // ensure MDC properties are copied
         KarafLogEvent eventCopy = new KarafLogEvent(event);
         this.buffer.add(eventCopy);
diff --git a/log/src/test/java/org/apache/karaf/log/core/internal/CircularBufferTest.java b/log/src/test/java/org/apache/karaf/log/core/internal/CircularBufferTest.java
new file mode 100644
index 0000000..dfd1c1f
--- /dev/null
+++ b/log/src/test/java/org/apache/karaf/log/core/internal/CircularBufferTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.karaf.log.core.internal;
+
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.junit.Test;
+import org.ops4j.pax.logging.log4j2.internal.spi.PaxLoggingEventImpl;
+import org.ops4j.pax.logging.spi.PaxLoggingEvent;
+
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toList;
+import static org.junit.Assert.assertEquals;
+
+public class CircularBufferTest {
+    @Test
+    public void use() {
+        final CircularBuffer buffer = new CircularBuffer(3);
+        assertEquals(0, buffer.getElements(buffer.maxSize()).size());
+        final PaxLoggingEvent e1 = addAndAssertEvent(buffer);
+        final PaxLoggingEvent e2 = addAndAssertEvent(buffer, e1);
+        final PaxLoggingEvent e3 = addAndAssertEvent(buffer, e1, e2);
+        final PaxLoggingEvent e4 = addAndAssertEvent(buffer, e2, e3);
+        final PaxLoggingEvent e5 = addAndAssertEvent(buffer, e3, e4);
+        final PaxLoggingEvent e6 = addAndAssertEvent(buffer, e4, e5);
+        addAndAssertEvent(buffer, e5, e6);
+    }
+
+    private PaxLoggingEvent addAndAssertEvent(final CircularBuffer buffer, final PaxLoggingEvent... previous) {
+        final PaxLoggingEvent e4 = newEvent();
+        buffer.add(e4);
+        assertEquals(Stream.concat(Stream.of(previous), Stream.of(e4)).collect(toList()), buffer.getElements(buffer.maxSize()));
+        return e4;
+    }
+
+    private PaxLoggingEvent newEvent() {
+        return new PaxLoggingEventImpl(new Log4jLogEvent());
+    }
+}