You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2019/01/02 15:58:51 UTC
[aries-journaled-events] 02/02: ARIES-1878 - In memory
implementation
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/aries-journaled-events.git
commit c5625d7943405be2c14bdfc2eb94b9dfa11ccc1c
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Wed Jan 2 16:58:40 2019 +0100
ARIES-1878 - In memory implementation
---
org.apache.aries.events.memory/pom.xml | 20 +++
.../aries/events/memory/InMemoryMessaging.java | 48 +++++++
.../org/apache/aries/events/memory/Journal.java | 38 ++++++
.../apache/aries/events/memory/MemoryMessage.java | 27 ++++
.../apache/aries/events/memory/MemoryPosition.java | 21 +++
.../java/org/apache/aries/events/memory/Topic.java | 106 +++++++++++++++
.../apache/aries/events/memory/MessagingTest.java | 146 +++++++++++++++++++++
pom.xml | 1 +
8 files changed, 407 insertions(+)
diff --git a/org.apache.aries.events.memory/pom.xml b/org.apache.aries.events.memory/pom.xml
new file mode 100644
index 0000000..f67f5d3
--- /dev/null
+++ b/org.apache.aries.events.memory/pom.xml
@@ -0,0 +1,20 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.aries.events</groupId>
+ <artifactId>org.apache.aries.events</artifactId>
+ <version>0.1.0-SNAPSHOT</version>
+ </parent>
+ <groupId>org.apache.aries.events.memory</groupId>
+ <artifactId>org.apache.aries.events.memory</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.aries.events</groupId>
+ <artifactId>org.apache.aries.events.api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
new file mode 100644
index 0000000..c1ee055
--- /dev/null
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
@@ -0,0 +1,48 @@
+package org.apache.aries.events.memory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Messaging;
+import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.Received;
+import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.Subscription;
+import org.apache.aries.events.api.Type;
+import org.osgi.service.component.annotations.Component;
+
+@Component
+@Type("memory")
+public class InMemoryMessaging implements Messaging {
+ private Map<String, Topic> topics = new ConcurrentHashMap<>();
+
+ @Override
+ public Position send(String topicName, Message message) {
+ Topic topic = getOrCreate(topicName);
+ return topic.send(message);
+ }
+
+ @Override
+ public Subscription subscribe(String topicName, Position position, Seek seek, Consumer<Received> callback) {
+ Topic topic = getOrCreate(topicName);
+ return topic.subscribe(position, seek, callback);
+ }
+
+ @Override
+ public Message newMessage(byte[] payload, Map<String, String> props) {
+ return new MemoryMessage(payload, props);
+ }
+
+ @Override
+ public Position positionFromString(String position) {
+ long offset = new Long(position).longValue();
+ return new MemoryPosition(offset);
+ }
+
+ private Topic getOrCreate(String topicName) {
+ return topics.computeIfAbsent(topicName, topicName2 -> new Topic(topicName2));
+ }
+
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java
new file mode 100644
index 0000000..df1196c
--- /dev/null
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java
@@ -0,0 +1,38 @@
+package org.apache.aries.events.memory;
+
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Journal<T> {
+ private AtomicLong nextOffset = new AtomicLong();
+ private ConcurrentNavigableMap<Long, T> messages = new ConcurrentSkipListMap<>();
+
+ public long append(T message) {
+ Long offset = nextOffset.getAndIncrement();
+ messages.put(offset, message);
+ return offset;
+ }
+
+ public long getFirstOffset() {
+ try {
+ return messages.firstKey();
+ } catch (NoSuchElementException e) {
+ return 0;
+ }
+ }
+
+ public long getLastOffset() {
+ try {
+ return messages.lastKey();
+ } catch (NoSuchElementException e) {
+ return -1;
+ }
+ }
+
+ public Entry<Long, T> getNext(long offset) {
+ return this.messages.ceilingEntry(offset);
+ }
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryMessage.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryMessage.java
new file mode 100644
index 0000000..ef17d2c
--- /dev/null
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryMessage.java
@@ -0,0 +1,27 @@
+package org.apache.aries.events.memory;
+
+import java.util.Map;
+
+import org.apache.aries.events.api.Message;
+
+class MemoryMessage implements Message {
+
+ private byte[] payload;
+ private Map<String, String> properties;
+
+ MemoryMessage(byte[] payload, Map<String, String> props) {
+ this.payload = payload;
+ properties = props;
+ }
+
+ @Override
+ public byte[] getPayload() {
+ return this.payload;
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return this.properties;
+ }
+
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
new file mode 100644
index 0000000..b147c10
--- /dev/null
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
@@ -0,0 +1,21 @@
+package org.apache.aries.events.memory;
+
+import org.apache.aries.events.api.Position;
+
+class MemoryPosition implements Position {
+
+ private long offset;
+
+ MemoryPosition(long offset) {
+ this.offset = offset;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ @Override
+ public String toString() {
+ return new Long(offset).toString();
+ }
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
new file mode 100644
index 0000000..3e969ac
--- /dev/null
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
@@ -0,0 +1,106 @@
+package org.apache.aries.events.memory;
+
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.Received;
+import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Topic {
+ private Logger log = LoggerFactory.getLogger(this.getClass());
+
+ private String topicName;
+ private Journal<Message> journal;
+ private Set<Subscription> subscriptions = new HashSet<>();
+
+ public Topic(String topicName) {
+ this.topicName = topicName;
+ this.journal = new Journal<>();
+ }
+
+ public Position send(Message message) {
+ long offset = this.journal.append(message);
+ return new MemoryPosition(offset);
+ }
+
+ public Subscription subscribe(Position position, Seek seek, Consumer<Received> callback) {
+ long startOffset = getStartOffset(position, seek);
+ log.debug("Consuming from " + startOffset);
+ return new TopicSubscription(startOffset, callback);
+ }
+
+ private long getStartOffset(Position position, Seek seek) {
+ if (position != null) {
+ return position.getOffset();
+ } else {
+ if (seek == Seek.earliest) {
+ return this.journal.getFirstOffset();
+ } else if (seek == Seek.latest) {
+ return this.journal.getLastOffset() + 1;
+ } else {
+ throw new IllegalArgumentException("Seek must not be null");
+ }
+ }
+ }
+
+ class TopicSubscription implements Subscription {
+ private Consumer<Received> callback;
+ private ExecutorService executor;
+ private volatile boolean running;
+ private long currentOffset;
+
+ TopicSubscription(long startOffset, Consumer<Received> callback) {
+ this.currentOffset = startOffset;
+ this.callback = callback;
+ this.running = true;
+ String name = "Poller for " + topicName;
+ this.executor = Executors.newSingleThreadExecutor(r -> new Thread(r, name));
+ this.executor.execute(this::poll);
+ }
+
+ private void poll() {
+ while (running) {
+ Entry<Long, Message> entry = journal.getNext(currentOffset);
+ if (entry != null) {
+ long offset = entry.getKey();
+ try {
+ MemoryPosition position = new MemoryPosition(this.currentOffset);
+ Received received = new Received(position, entry.getValue());
+ callback.accept(received);
+ } catch (Exception e) {
+ log.warn(e.getMessage(), e);
+ }
+ this.currentOffset = offset + 1;
+ } else {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ this.running = false;
+ executor.shutdown();
+ try {
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ subscriptions.remove(this);
+ }
+
+ }
+}
diff --git a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
new file mode 100644
index 0000000..a8262bd
--- /dev/null
+++ b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
@@ -0,0 +1,146 @@
+package org.apache.aries.events.memory;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Messaging;
+import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.Received;
+import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.Subscription;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+
+public class MessagingTest {
+
+ @Mock
+ private Consumer<Received> callback;
+
+ @Captor
+ private ArgumentCaptor<Received> messageCaptor;
+
+ private Set<Subscription> subscriptions = new HashSet<>();
+
+ private Messaging messaging;
+
+ @Before
+ public void before() {
+ initMocks(this);
+ messaging = new InMemoryMessaging();
+ }
+
+ @After
+ public void after() {
+ subscriptions.forEach(Subscription::close);
+ }
+
+ @Test
+ public void testPositionFromString() {
+ Position pos = messaging.positionFromString("1");
+ assertThat(pos.getOffset(), equalTo(1l));
+ }
+
+ @Test
+ public void testSend() {
+ subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+ String content = "testcontent";
+ Position pos = send("test", content);
+ assertThat(pos.toString(), equalTo("0"));
+
+ verify(callback, timeout(1000)).accept(messageCaptor.capture());
+ Received received = messageCaptor.getValue();
+ assertThat(received.getMessage().getPayload(), equalTo(toBytes(content)));
+ assertThat(received.getPosition().getOffset(), equalTo(0l));
+ assertThat(received.getMessage().getProperties().size(), equalTo(1));
+ assertThat(received.getMessage().getProperties().get("my"), equalTo("testvalue"));
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testInvalid() {
+ messaging.subscribe("test", null, null, callback);
+ }
+
+ @Test
+ public void testEarliestBefore() {
+ subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+ send("test", "testcontent");
+ send("test", "testcontent2");
+ verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
+ assertThat(messageContents(), contains("testcontent", "testcontent2"));
+ }
+
+ @Test
+ public void testEarliestAfter() {
+ send("test", "testcontent");
+ subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+ send("test", "testcontent2");
+ verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
+ assertThat(messageContents(), contains("testcontent", "testcontent2"));
+ }
+
+ @Test
+ public void testLatestBefore() {
+ subscriptions.add(messaging.subscribe("test", null, Seek.latest, callback));
+ send("test", "testcontent");
+ send("test", "testcontent2");
+ verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
+ assertThat(messageContents(), contains("testcontent", "testcontent2"));
+ }
+
+ @Test
+ public void testLatest() {
+ send("test", "testcontent");
+ subscriptions.add(messaging.subscribe("test", null, Seek.latest, callback));
+ send("test", "testcontent2");
+ verify(callback, timeout(1000)).accept(messageCaptor.capture());
+ assertThat(messageContents(), contains("testcontent2"));
+ }
+
+ @Test
+ public void testFrom1() {
+ send("test", "testcontent");
+ send("test", "testcontent2");
+ subscriptions.add(messaging.subscribe("test", new MemoryPosition(1l), Seek.earliest, callback));
+ verify(callback, timeout(1000)).accept(messageCaptor.capture());
+ assertThat(messageContents(), contains("testcontent2"));
+ }
+
+ private List<String> messageContents() {
+ return messageCaptor.getAllValues().stream()
+ .map(this::getContent).collect(Collectors.toList());
+ }
+
+ private String getContent(Received rec) {
+ return new String(rec.getMessage().getPayload(), Charset.forName("UTF-8"));
+ }
+
+ private Position send(String topic, String content) {
+ Map<String, String> props = new HashMap<String, String>();
+ props.put("my", "testvalue");
+ Message message = messaging.newMessage(toBytes(content), props);
+ return messaging.send(topic, message);
+ }
+
+ private byte[] toBytes(String content) {
+ return content.getBytes(Charset.forName("UTF-8"));
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index d300cac..44c5c17 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,7 @@
<modules>
<module>org.apache.aries.events.api</module>
+ <module>org.apache.aries.events.memory</module>
</modules>
<properties>