You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2019/01/05 10:05:17 UTC
[aries-journaled-events] branch master updated: ARIES-1878 - Add
eviction after a certain number of messages
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/aries-journaled-events.git
The following commit(s) were added to refs/heads/master by this push:
new 42932be ARIES-1878 - Add eviction after a certain number of messages
42932be is described below
commit 42932be35652c18969c9133b3aa25a9492a27e38
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Sat Jan 5 11:05:08 2019 +0100
ARIES-1878 - Add eviction after a certain number of messages
---
.../aries/events/memory/InMemoryMessaging.java | 14 +++++++++++--
.../org/apache/aries/events/memory/Journal.java | 24 +++++++++++++++++++---
.../java/org/apache/aries/events/memory/Topic.java | 12 +++++------
.../apache/aries/events/memory/MessagingTest.java | 19 +++++++++++++++++
4 files changed, 58 insertions(+), 11 deletions(-)
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
index 634887b..488e4f5 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
@@ -31,7 +31,17 @@ import org.osgi.service.component.annotations.Component;
@Component
@Type("memory")
public class InMemoryMessaging implements Messaging {
- private Map<String, Topic> topics = new ConcurrentHashMap<>();
+ private final Map<String, Topic> topics = new ConcurrentHashMap<>();
+ private final int keepAtLeast;
+
+ public InMemoryMessaging() {
+ this(10000);
+ }
+
+ public InMemoryMessaging(int keepAtLeast) {
+ this.keepAtLeast = keepAtLeast;
+
+ }
@Override
public void send(String topicName, Message message) {
@@ -57,7 +67,7 @@ public class InMemoryMessaging implements Messaging {
}
private Topic getOrCreate(String topicName) {
- return topics.computeIfAbsent(topicName, Topic::new);
+ return topics.computeIfAbsent(topicName, topicName2 -> new Topic(topicName2, keepAtLeast));
}
}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java
index 4fb5d0a..15c9585 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java
@@ -17,22 +17,40 @@
*/
package org.apache.aries.events.memory;
+import java.util.Iterator;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
-public class Journal<T> {
- private AtomicLong nextOffset = new AtomicLong();
- private ConcurrentNavigableMap<Long, T> messages = new ConcurrentSkipListMap<>();
+class Journal<T> {
+ private final int keepAtLeast;
+ private final AtomicLong nextOffset = new AtomicLong();
+ private final ConcurrentNavigableMap<Long, T> messages = new ConcurrentSkipListMap<>();
+ private final AtomicLong count = new AtomicLong();
+
+ public Journal(int keepAtLeast) {
+ this.keepAtLeast = keepAtLeast;
+ }
public long append(T message) {
+ if (count.incrementAndGet() > keepAtLeast * 2) {
+ evict();
+ }
Long offset = nextOffset.getAndIncrement();
messages.put(offset, message);
return offset;
}
+ private synchronized void evict() {
+ Iterator<Long> it = messages.keySet().iterator();
+ for (int c = 0; c < keepAtLeast; c++) {
+ messages.remove(it.next());
+ }
+ count.set(0);
+ }
+
public long getFirstOffset() {
try {
return messages.firstKey();
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
index d7703a9..90ab931 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
@@ -31,15 +31,15 @@ import org.apache.aries.events.api.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class Topic {
- private Logger log = LoggerFactory.getLogger(this.getClass());
+class Topic {
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
- private String topicName;
- private Journal<Message> journal;
+ private final String topicName;
+ private final Journal<Message> journal;
- public Topic(String topicName) {
+ public Topic(String topicName, int keepAtLeast) {
this.topicName = topicName;
- this.journal = new Journal<>();
+ this.journal = new Journal<>(keepAtLeast);
}
public synchronized Position send(Message message) {
diff --git a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
index 89c39b4..c38bb41 100644
--- a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
+++ b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
@@ -1,6 +1,7 @@
package org.apache.aries.events.memory;
import static org.apache.aries.events.api.SubscribeRequest.to;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertEquals;
@@ -16,6 +17,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -36,6 +38,8 @@ import org.mockito.Mockito;
public class MessagingTest {
+ private static final long MAX_MANY = 100000l;
+
@Mock
private Consumer<Received> callback;
@@ -135,6 +139,21 @@ public class MessagingTest {
assertThat(messageContents(), contains("testcontent2"));
}
+ @Test
+ public void testMany() {
+ AtomicLong count = new AtomicLong();
+ Consumer<Received> manyCallback = rec -> { count.incrementAndGet(); };
+ messaging.subscribe(to("test", manyCallback));
+ for (long c=0; c < MAX_MANY; c++) {
+ send("test", "content " + c);
+ if (c % 10000 == 0) {
+ System.out.println("Sending " + c);
+ }
+
+ }
+ await().until(count::get, equalTo(MAX_MANY));
+ }
+
private void assertMessages(int num) {
verify(callback, timeout(1000).times(num)).accept(messageCaptor.capture());
}