You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2019/01/02 15:58:51 UTC

[aries-journaled-events] 02/02: ARIES-1878 - In memory implementation

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/aries-journaled-events.git

commit c5625d7943405be2c14bdfc2eb94b9dfa11ccc1c
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Wed Jan 2 16:58:40 2019 +0100

    ARIES-1878 - In memory implementation
---
 org.apache.aries.events.memory/pom.xml             |  20 +++
 .../aries/events/memory/InMemoryMessaging.java     |  48 +++++++
 .../org/apache/aries/events/memory/Journal.java    |  38 ++++++
 .../apache/aries/events/memory/MemoryMessage.java  |  27 ++++
 .../apache/aries/events/memory/MemoryPosition.java |  21 +++
 .../java/org/apache/aries/events/memory/Topic.java | 106 +++++++++++++++
 .../apache/aries/events/memory/MessagingTest.java  | 146 +++++++++++++++++++++
 pom.xml                                            |   1 +
 8 files changed, 407 insertions(+)

diff --git a/org.apache.aries.events.memory/pom.xml b/org.apache.aries.events.memory/pom.xml
new file mode 100644
index 0000000..f67f5d3
--- /dev/null
+++ b/org.apache.aries.events.memory/pom.xml
@@ -0,0 +1,20 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.aries.events</groupId>
+        <artifactId>org.apache.aries.events</artifactId>
+        <version>0.1.0-SNAPSHOT</version>
+    </parent>
+    <groupId>org.apache.aries.events.memory</groupId>
+    <artifactId>org.apache.aries.events.memory</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.aries.events</groupId>
+            <artifactId>org.apache.aries.events.api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
new file mode 100644
index 0000000..c1ee055
--- /dev/null
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
@@ -0,0 +1,48 @@
+package org.apache.aries.events.memory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Messaging;
+import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.Received;
+import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.Subscription;
+import org.apache.aries.events.api.Type;
+import org.osgi.service.component.annotations.Component;
+
+@Component
+@Type("memory")
+public class InMemoryMessaging implements Messaging {
+    private Map<String, Topic> topics = new ConcurrentHashMap<>();
+
+    @Override
+    public Position send(String topicName, Message message) {
+        Topic topic = getOrCreate(topicName);
+        return topic.send(message);
+    }
+
+    @Override
+    public Subscription subscribe(String topicName, Position position, Seek seek, Consumer<Received> callback) {
+        Topic topic = getOrCreate(topicName);
+        return topic.subscribe(position, seek, callback);
+    }
+
+    @Override
+    public Message newMessage(byte[] payload, Map<String, String> props) {
+        return new MemoryMessage(payload, props);
+    }
+
+    @Override
+    public Position positionFromString(String position) {
+        long offset = new Long(position).longValue();
+        return new MemoryPosition(offset);
+    }
+
+    private Topic getOrCreate(String topicName) {
+        return topics.computeIfAbsent(topicName, topicName2 -> new Topic(topicName2));
+    }
+
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java
new file mode 100644
index 0000000..df1196c
--- /dev/null
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java
@@ -0,0 +1,38 @@
+package org.apache.aries.events.memory;
+
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Journal<T> {
+    private AtomicLong nextOffset = new AtomicLong();
+    private ConcurrentNavigableMap<Long, T> messages = new ConcurrentSkipListMap<>();
+    
+    public long append(T message) {
+        Long offset = nextOffset.getAndIncrement();
+        messages.put(offset, message);
+        return offset;
+    }
+
+    public long getFirstOffset() {
+        try {
+            return messages.firstKey();
+        } catch (NoSuchElementException e) {
+            return 0;
+        }
+    }
+
+    public long getLastOffset() {
+        try {
+            return messages.lastKey();
+        } catch (NoSuchElementException e) {
+            return -1;
+        }
+    }
+
+    public Entry<Long, T> getNext(long offset) {
+        return this.messages.ceilingEntry(offset);
+    }
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryMessage.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryMessage.java
new file mode 100644
index 0000000..ef17d2c
--- /dev/null
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryMessage.java
@@ -0,0 +1,27 @@
+package org.apache.aries.events.memory;
+
+import java.util.Map;
+
+import org.apache.aries.events.api.Message;
+
+class MemoryMessage implements Message {
+
+    private byte[] payload;
+    private Map<String, String> properties;
+
+    MemoryMessage(byte[] payload, Map<String, String> props) {
+        this.payload = payload;
+        properties = props;
+    }
+
+    @Override
+    public byte[] getPayload() {
+        return this.payload;
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return this.properties;
+    }
+
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
new file mode 100644
index 0000000..b147c10
--- /dev/null
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
@@ -0,0 +1,21 @@
+package org.apache.aries.events.memory;
+
+import org.apache.aries.events.api.Position;
+
+class MemoryPosition implements Position {
+
+    private long offset;
+
+    MemoryPosition(long offset) {
+        this.offset = offset;
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+
+    @Override
+    public String toString() {
+        return new Long(offset).toString();
+    }
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
new file mode 100644
index 0000000..3e969ac
--- /dev/null
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
@@ -0,0 +1,106 @@
+package org.apache.aries.events.memory;
+
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.Received;
+import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Topic {
+    private Logger log = LoggerFactory.getLogger(this.getClass());
+
+    private String topicName;
+    private Journal<Message> journal;
+    private Set<Subscription> subscriptions = new HashSet<>();
+
+    public Topic(String topicName) {
+        this.topicName = topicName;
+        this.journal = new Journal<>();
+    }
+
+    public Position send(Message message) {
+        long offset = this.journal.append(message);
+        return new MemoryPosition(offset);
+    }
+
+    public Subscription subscribe(Position position, Seek seek, Consumer<Received> callback) {
+        long startOffset = getStartOffset(position, seek);
+        log.debug("Consuming from " + startOffset);
+        return new TopicSubscription(startOffset, callback);
+    }
+
+    private long getStartOffset(Position position, Seek seek) {
+        if (position != null) {
+            return position.getOffset();
+        } else {
+            if (seek == Seek.earliest) {
+                return this.journal.getFirstOffset();
+            } else if (seek == Seek.latest) {
+                return this.journal.getLastOffset() + 1;
+            } else {
+                throw new IllegalArgumentException("Seek must not be null");
+            }
+        }
+    }
+
+    class TopicSubscription implements Subscription {
+        private Consumer<Received> callback;
+        private ExecutorService executor;
+        private volatile boolean running;
+        private long currentOffset;
+
+        TopicSubscription(long startOffset, Consumer<Received> callback) {
+            this.currentOffset = startOffset;
+            this.callback = callback;
+            this.running = true;
+            String name = "Poller for " + topicName;
+            this.executor = Executors.newSingleThreadExecutor(r -> new Thread(r, name));
+            this.executor.execute(this::poll);
+        }
+        
+        private void poll() {
+            while (running) {
+                Entry<Long, Message> entry = journal.getNext(currentOffset);
+                if (entry != null) {
+                    long offset = entry.getKey();
+                    try {
+                        MemoryPosition position = new MemoryPosition(this.currentOffset);
+                        Received received = new Received(position, entry.getValue());
+                        callback.accept(received);
+                    } catch (Exception e) {
+                        log.warn(e.getMessage(), e);
+                    }
+                    this.currentOffset = offset + 1;
+                } else {
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException e) {
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void close() {
+            this.running = false;
+            executor.shutdown();
+            try {
+                executor.awaitTermination(10, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                // Ignore
+            }
+            subscriptions.remove(this);
+        }
+
+    }
+}
diff --git a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
new file mode 100644
index 0000000..a8262bd
--- /dev/null
+++ b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
@@ -0,0 +1,146 @@
+package org.apache.aries.events.memory;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Messaging;
+import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.Received;
+import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.Subscription;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+
+public class MessagingTest {
+    
+    @Mock
+    private Consumer<Received> callback;
+    
+    @Captor
+    private ArgumentCaptor<Received> messageCaptor;
+
+    private Set<Subscription> subscriptions = new HashSet<>();
+
+    private Messaging messaging;
+    
+    @Before
+    public void before() {
+        initMocks(this);
+        messaging = new InMemoryMessaging();
+    }
+    
+    @After
+    public void after() {
+        subscriptions.forEach(Subscription::close);
+    }
+    
+    @Test
+    public void testPositionFromString() {
+        Position pos = messaging.positionFromString("1");
+        assertThat(pos.getOffset(), equalTo(1l));
+    }
+    
+    @Test
+    public void testSend() {
+        subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+        String content = "testcontent";
+        Position pos = send("test", content);
+        assertThat(pos.toString(), equalTo("0"));
+        
+        verify(callback, timeout(1000)).accept(messageCaptor.capture());
+        Received received = messageCaptor.getValue();
+        assertThat(received.getMessage().getPayload(), equalTo(toBytes(content)));
+        assertThat(received.getPosition().getOffset(), equalTo(0l));
+        assertThat(received.getMessage().getProperties().size(), equalTo(1));
+        assertThat(received.getMessage().getProperties().get("my"), equalTo("testvalue"));
+    }
+    
+    @Test(expected=IllegalArgumentException.class)
+    public void testInvalid() {
+        messaging.subscribe("test", null, null, callback);
+    }
+
+    @Test
+    public void testEarliestBefore() {
+        subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+        send("test", "testcontent");
+        send("test", "testcontent2");
+        verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
+        assertThat(messageContents(), contains("testcontent", "testcontent2"));
+    }
+    
+    @Test
+    public void testEarliestAfter() {
+        send("test", "testcontent");
+        subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+        send("test", "testcontent2");
+        verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
+        assertThat(messageContents(), contains("testcontent", "testcontent2"));
+    }
+    
+    @Test
+    public void testLatestBefore() {
+        subscriptions.add(messaging.subscribe("test", null, Seek.latest, callback));
+        send("test", "testcontent");
+        send("test", "testcontent2");
+        verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
+        assertThat(messageContents(), contains("testcontent", "testcontent2"));
+    }
+    
+    @Test
+    public void testLatest() {
+        send("test", "testcontent");
+        subscriptions.add(messaging.subscribe("test", null, Seek.latest, callback));
+        send("test", "testcontent2");
+        verify(callback, timeout(1000)).accept(messageCaptor.capture());
+        assertThat(messageContents(), contains("testcontent2"));
+    }
+    
+    @Test
+    public void testFrom1() {
+        send("test", "testcontent");
+        send("test", "testcontent2");
+        subscriptions.add(messaging.subscribe("test", new MemoryPosition(1l), Seek.earliest, callback));
+        verify(callback, timeout(1000)).accept(messageCaptor.capture());
+        assertThat(messageContents(), contains("testcontent2"));
+    }
+
+    private List<String> messageContents() {
+        return messageCaptor.getAllValues().stream()
+                .map(this::getContent).collect(Collectors.toList());
+    }
+    
+    private String getContent(Received rec) {
+        return new String(rec.getMessage().getPayload(), Charset.forName("UTF-8"));
+    }
+    
+    private Position send(String topic, String content) {
+        Map<String, String> props = new HashMap<String, String>();
+        props.put("my", "testvalue");
+        Message message = messaging.newMessage(toBytes(content), props);
+        return messaging.send(topic, message);
+    }
+
+    private byte[] toBytes(String content) {
+        return content.getBytes(Charset.forName("UTF-8"));
+    }
+    
+}
diff --git a/pom.xml b/pom.xml
index d300cac..44c5c17 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,7 @@
 
     <modules>
         <module>org.apache.aries.events.api</module>
+        <module>org.apache.aries.events.memory</module>
     </modules>
 
     <properties>