You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2019/01/05 10:05:17 UTC

[aries-journaled-events] branch master updated: ARIES-1878 - Add eviction after a certain number of messages

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/aries-journaled-events.git


The following commit(s) were added to refs/heads/master by this push:
     new 42932be  ARIES-1878 - Add eviction after a certain number of messages
42932be is described below

commit 42932be35652c18969c9133b3aa25a9492a27e38
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Sat Jan 5 11:05:08 2019 +0100

    ARIES-1878 - Add eviction after a certain number of messages
---
 .../aries/events/memory/InMemoryMessaging.java     | 14 +++++++++++--
 .../org/apache/aries/events/memory/Journal.java    | 24 +++++++++++++++++++---
 .../java/org/apache/aries/events/memory/Topic.java | 12 +++++------
 .../apache/aries/events/memory/MessagingTest.java  | 19 +++++++++++++++++
 4 files changed, 58 insertions(+), 11 deletions(-)

diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
index 634887b..488e4f5 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
@@ -31,7 +31,17 @@ import org.osgi.service.component.annotations.Component;
 @Component
 @Type("memory")
 public class InMemoryMessaging implements Messaging {
-    private Map<String, Topic> topics = new ConcurrentHashMap<>();
+    private final Map<String, Topic> topics = new ConcurrentHashMap<>();
+    private final int keepAtLeast;
+    
+    public InMemoryMessaging() {
+        this(10000);
+    }
+
+    public InMemoryMessaging(int keepAtLeast) {
+        this.keepAtLeast = keepAtLeast;
+        
+    }
 
     @Override
     public void send(String topicName, Message message) {
@@ -57,7 +67,7 @@ public class InMemoryMessaging implements Messaging {
     }
 
     private Topic getOrCreate(String topicName) {
-        return topics.computeIfAbsent(topicName, Topic::new);
+        return topics.computeIfAbsent(topicName, topicName2 -> new Topic(topicName2, keepAtLeast));
     }
 
 }
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java
index 4fb5d0a..15c9585 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java
@@ -17,22 +17,40 @@
  */
 package org.apache.aries.events.memory;
 
+import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicLong;
 
-public class Journal<T> {
-    private AtomicLong nextOffset = new AtomicLong();
-    private ConcurrentNavigableMap<Long, T> messages = new ConcurrentSkipListMap<>();
+class Journal<T> {
+    private final int keepAtLeast;
+    private final AtomicLong nextOffset = new AtomicLong();
+    private final ConcurrentNavigableMap<Long, T> messages = new ConcurrentSkipListMap<>();
+    private final AtomicLong count = new AtomicLong();
+    
+    public Journal(int keepAtLeast) {
+        this.keepAtLeast = keepAtLeast;
+    }
     
     public long append(T message) {
+        if (count.incrementAndGet() > keepAtLeast * 2) {
+            evict();
+        }
         Long offset = nextOffset.getAndIncrement();
         messages.put(offset, message);
         return offset;
     }
 
+    private synchronized void evict() {
+        Iterator<Long> it = messages.keySet().iterator();
+        for (int c = 0; c < keepAtLeast; c++) {
+            messages.remove(it.next());
+        }
+        count.set(0);
+    }
+
     public long getFirstOffset() {
         try {
             return messages.firstKey();
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
index d7703a9..90ab931 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
@@ -31,15 +31,15 @@ import org.apache.aries.events.api.Subscription;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class Topic {
-    private Logger log = LoggerFactory.getLogger(this.getClass());
+class Topic {
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
 
-    private String topicName;
-    private Journal<Message> journal;
+    private final String topicName;
+    private final Journal<Message> journal;
 
-    public Topic(String topicName) {
+    public Topic(String topicName, int keepAtLeast) {
         this.topicName = topicName;
-        this.journal = new Journal<>();
+        this.journal = new Journal<>(keepAtLeast);
     }
 
     public synchronized Position send(Message message) {
diff --git a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
index 89c39b4..c38bb41 100644
--- a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
+++ b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
@@ -1,6 +1,7 @@
 package org.apache.aries.events.memory;
 
 import static org.apache.aries.events.api.SubscribeRequest.to;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertEquals;
@@ -16,6 +17,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -36,6 +38,8 @@ import org.mockito.Mockito;
 
 public class MessagingTest {
     
+    private static final long MAX_MANY = 100000l;
+
     @Mock
     private Consumer<Received> callback;
     
@@ -135,6 +139,21 @@ public class MessagingTest {
         assertThat(messageContents(), contains("testcontent2"));
     }
     
+    @Test
+    public void testMany() {
+        AtomicLong count = new AtomicLong();
+        Consumer<Received> manyCallback = rec -> { count.incrementAndGet(); };
+        messaging.subscribe(to("test", manyCallback));
+        for (long c=0; c < MAX_MANY; c++) {
+            send("test", "content " + c);
+            if (c % 10000 == 0) {
+                System.out.println("Sending " + c);
+            }
+            
+        }
+        await().until(count::get, equalTo(MAX_MANY)); 
+    }
+    
     private void assertMessages(int num) {
         verify(callback, timeout(1000).times(num)).accept(messageCaptor.capture());
     }