You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2019/01/02 15:58:49 UTC

[aries-journaled-events] branch master updated (530611c -> c5625d7)

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/aries-journaled-events.git.


    from 530611c  Remove TopicPosition as we do not support partitions
     new f5a98ce  ARIES-1879 - Create Received to avoid null Position. Move positionToString to Position. Add Position.getOffset
     new c5625d7  ARIES-1878 - In memory implementation

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/aries/events/api/Message.java  |   5 -
 .../org/apache/aries/events/api/Messaging.java     |  11 +-
 .../java/org/apache/aries/events/api/Position.java |   2 +-
 .../java/org/apache/aries/events/api/Received.java |  19 +++
 .../java/org/apache/aries/events/api/Type.java     |   5 +
 org.apache.aries.events.memory/pom.xml             |  20 +++
 .../aries/events/memory/InMemoryMessaging.java     |  48 +++++++
 .../org/apache/aries/events/memory/Journal.java    |  38 ++++++
 .../apache/aries/events/memory/MemoryMessage.java  |  27 ++++
 .../apache/aries/events/memory/MemoryPosition.java |  21 +++
 .../java/org/apache/aries/events/memory/Topic.java | 106 +++++++++++++++
 .../apache/aries/events/memory/MessagingTest.java  | 146 +++++++++++++++++++++
 pom.xml                                            |   1 +
 13 files changed, 434 insertions(+), 15 deletions(-)
 create mode 100644 org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Received.java
 create mode 100644 org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Type.java
 create mode 100644 org.apache.aries.events.memory/pom.xml
 create mode 100644 org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
 create mode 100644 org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java
 create mode 100644 org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryMessage.java
 create mode 100644 org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
 create mode 100644 org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
 create mode 100644 org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java


[aries-journaled-events] 02/02: ARIES-1878 - In memory implementation

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/aries-journaled-events.git

commit c5625d7943405be2c14bdfc2eb94b9dfa11ccc1c
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Wed Jan 2 16:58:40 2019 +0100

    ARIES-1878 - In memory implementation
---
 org.apache.aries.events.memory/pom.xml             |  20 +++
 .../aries/events/memory/InMemoryMessaging.java     |  48 +++++++
 .../org/apache/aries/events/memory/Journal.java    |  38 ++++++
 .../apache/aries/events/memory/MemoryMessage.java  |  27 ++++
 .../apache/aries/events/memory/MemoryPosition.java |  21 +++
 .../java/org/apache/aries/events/memory/Topic.java | 106 +++++++++++++++
 .../apache/aries/events/memory/MessagingTest.java  | 146 +++++++++++++++++++++
 pom.xml                                            |   1 +
 8 files changed, 407 insertions(+)

diff --git a/org.apache.aries.events.memory/pom.xml b/org.apache.aries.events.memory/pom.xml
new file mode 100644
index 0000000..f67f5d3
--- /dev/null
+++ b/org.apache.aries.events.memory/pom.xml
@@ -0,0 +1,20 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.aries.events</groupId>
+        <artifactId>org.apache.aries.events</artifactId>
+        <version>0.1.0-SNAPSHOT</version>
+    </parent>
+    <groupId>org.apache.aries.events.memory</groupId>
+    <artifactId>org.apache.aries.events.memory</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.aries.events</groupId>
+            <artifactId>org.apache.aries.events.api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
new file mode 100644
index 0000000..c1ee055
--- /dev/null
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
@@ -0,0 +1,48 @@
+package org.apache.aries.events.memory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Messaging;
+import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.Received;
+import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.Subscription;
+import org.apache.aries.events.api.Type;
+import org.osgi.service.component.annotations.Component;
+
+@Component
+@Type("memory")
+public class InMemoryMessaging implements Messaging {
+    private Map<String, Topic> topics = new ConcurrentHashMap<>();
+
+    @Override
+    public Position send(String topicName, Message message) {
+        Topic topic = getOrCreate(topicName);
+        return topic.send(message);
+    }
+
+    @Override
+    public Subscription subscribe(String topicName, Position position, Seek seek, Consumer<Received> callback) {
+        Topic topic = getOrCreate(topicName);
+        return topic.subscribe(position, seek, callback);
+    }
+
+    @Override
+    public Message newMessage(byte[] payload, Map<String, String> props) {
+        return new MemoryMessage(payload, props);
+    }
+
+    @Override
+    public Position positionFromString(String position) {
+        long offset = new Long(position).longValue();
+        return new MemoryPosition(offset);
+    }
+
+    private Topic getOrCreate(String topicName) {
+        return topics.computeIfAbsent(topicName, topicName2 -> new Topic(topicName2));
+    }
+
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java
new file mode 100644
index 0000000..df1196c
--- /dev/null
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java
@@ -0,0 +1,38 @@
+package org.apache.aries.events.memory;
+
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Journal<T> {
+    private AtomicLong nextOffset = new AtomicLong();
+    private ConcurrentNavigableMap<Long, T> messages = new ConcurrentSkipListMap<>();
+    
+    public long append(T message) {
+        Long offset = nextOffset.getAndIncrement();
+        messages.put(offset, message);
+        return offset;
+    }
+
+    public long getFirstOffset() {
+        try {
+            return messages.firstKey();
+        } catch (NoSuchElementException e) {
+            return 0;
+        }
+    }
+
+    public long getLastOffset() {
+        try {
+            return messages.lastKey();
+        } catch (NoSuchElementException e) {
+            return -1;
+        }
+    }
+
+    public Entry<Long, T> getNext(long offset) {
+        return this.messages.ceilingEntry(offset);
+    }
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryMessage.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryMessage.java
new file mode 100644
index 0000000..ef17d2c
--- /dev/null
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryMessage.java
@@ -0,0 +1,27 @@
+package org.apache.aries.events.memory;
+
+import java.util.Map;
+
+import org.apache.aries.events.api.Message;
+
+class MemoryMessage implements Message {
+
+    private byte[] payload;
+    private Map<String, String> properties;
+
+    MemoryMessage(byte[] payload, Map<String, String> props) {
+        this.payload = payload;
+        properties = props;
+    }
+
+    @Override
+    public byte[] getPayload() {
+        return this.payload;
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return this.properties;
+    }
+
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
new file mode 100644
index 0000000..b147c10
--- /dev/null
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
@@ -0,0 +1,21 @@
+package org.apache.aries.events.memory;
+
+import org.apache.aries.events.api.Position;
+
+class MemoryPosition implements Position {
+
+    private long offset;
+
+    MemoryPosition(long offset) {
+        this.offset = offset;
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+
+    @Override
+    public String toString() {
+        return new Long(offset).toString();
+    }
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
new file mode 100644
index 0000000..3e969ac
--- /dev/null
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
@@ -0,0 +1,106 @@
+package org.apache.aries.events.memory;
+
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.Received;
+import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Topic {
+    private Logger log = LoggerFactory.getLogger(this.getClass());
+
+    private String topicName;
+    private Journal<Message> journal;
+    private Set<Subscription> subscriptions = new HashSet<>();
+
+    public Topic(String topicName) {
+        this.topicName = topicName;
+        this.journal = new Journal<>();
+    }
+
+    public Position send(Message message) {
+        long offset = this.journal.append(message);
+        return new MemoryPosition(offset);
+    }
+
+    public Subscription subscribe(Position position, Seek seek, Consumer<Received> callback) {
+        long startOffset = getStartOffset(position, seek);
+        log.debug("Consuming from " + startOffset);
+        return new TopicSubscription(startOffset, callback);
+    }
+
+    private long getStartOffset(Position position, Seek seek) {
+        if (position != null) {
+            return position.getOffset();
+        } else {
+            if (seek == Seek.earliest) {
+                return this.journal.getFirstOffset();
+            } else if (seek == Seek.latest) {
+                return this.journal.getLastOffset() + 1;
+            } else {
+                throw new IllegalArgumentException("Seek must not be null");
+            }
+        }
+    }
+
+    class TopicSubscription implements Subscription {
+        private Consumer<Received> callback;
+        private ExecutorService executor;
+        private volatile boolean running;
+        private long currentOffset;
+
+        TopicSubscription(long startOffset, Consumer<Received> callback) {
+            this.currentOffset = startOffset;
+            this.callback = callback;
+            this.running = true;
+            String name = "Poller for " + topicName;
+            this.executor = Executors.newSingleThreadExecutor(r -> new Thread(r, name));
+            this.executor.execute(this::poll);
+        }
+        
+        private void poll() {
+            while (running) {
+                Entry<Long, Message> entry = journal.getNext(currentOffset);
+                if (entry != null) {
+                    long offset = entry.getKey();
+                    try {
+                        MemoryPosition position = new MemoryPosition(this.currentOffset);
+                        Received received = new Received(position, entry.getValue());
+                        callback.accept(received);
+                    } catch (Exception e) {
+                        log.warn(e.getMessage(), e);
+                    }
+                    this.currentOffset = offset + 1;
+                } else {
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException e) {
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void close() {
+            this.running = false;
+            executor.shutdown();
+            try {
+                executor.awaitTermination(10, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                // Ignore
+            }
+            subscriptions.remove(this);
+        }
+
+    }
+}
diff --git a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
new file mode 100644
index 0000000..a8262bd
--- /dev/null
+++ b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
@@ -0,0 +1,146 @@
+package org.apache.aries.events.memory;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Messaging;
+import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.Received;
+import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.Subscription;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+
+public class MessagingTest {
+    
+    @Mock
+    private Consumer<Received> callback;
+    
+    @Captor
+    private ArgumentCaptor<Received> messageCaptor;
+
+    private Set<Subscription> subscriptions = new HashSet<>();
+
+    private Messaging messaging;
+    
+    @Before
+    public void before() {
+        initMocks(this);
+        messaging = new InMemoryMessaging();
+    }
+    
+    @After
+    public void after() {
+        subscriptions.forEach(Subscription::close);
+    }
+    
+    @Test
+    public void testPositionFromString() {
+        Position pos = messaging.positionFromString("1");
+        assertThat(pos.getOffset(), equalTo(1l));
+    }
+    
+    @Test
+    public void testSend() {
+        subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+        String content = "testcontent";
+        Position pos = send("test", content);
+        assertThat(pos.toString(), equalTo("0"));
+        
+        verify(callback, timeout(1000)).accept(messageCaptor.capture());
+        Received received = messageCaptor.getValue();
+        assertThat(received.getMessage().getPayload(), equalTo(toBytes(content)));
+        assertThat(received.getPosition().getOffset(), equalTo(0l));
+        assertThat(received.getMessage().getProperties().size(), equalTo(1));
+        assertThat(received.getMessage().getProperties().get("my"), equalTo("testvalue"));
+    }
+    
+    @Test(expected=IllegalArgumentException.class)
+    public void testInvalid() {
+        messaging.subscribe("test", null, null, callback);
+    }
+
+    @Test
+    public void testEarliestBefore() {
+        subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+        send("test", "testcontent");
+        send("test", "testcontent2");
+        verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
+        assertThat(messageContents(), contains("testcontent", "testcontent2"));
+    }
+    
+    @Test
+    public void testEarliestAfter() {
+        send("test", "testcontent");
+        subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+        send("test", "testcontent2");
+        verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
+        assertThat(messageContents(), contains("testcontent", "testcontent2"));
+    }
+    
+    @Test
+    public void testLatestBefore() {
+        subscriptions.add(messaging.subscribe("test", null, Seek.latest, callback));
+        send("test", "testcontent");
+        send("test", "testcontent2");
+        verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
+        assertThat(messageContents(), contains("testcontent", "testcontent2"));
+    }
+    
+    @Test
+    public void testLatest() {
+        send("test", "testcontent");
+        subscriptions.add(messaging.subscribe("test", null, Seek.latest, callback));
+        send("test", "testcontent2");
+        verify(callback, timeout(1000)).accept(messageCaptor.capture());
+        assertThat(messageContents(), contains("testcontent2"));
+    }
+    
+    @Test
+    public void testFrom1() {
+        send("test", "testcontent");
+        send("test", "testcontent2");
+        subscriptions.add(messaging.subscribe("test", new MemoryPosition(1l), Seek.earliest, callback));
+        verify(callback, timeout(1000)).accept(messageCaptor.capture());
+        assertThat(messageContents(), contains("testcontent2"));
+    }
+
+    private List<String> messageContents() {
+        return messageCaptor.getAllValues().stream()
+                .map(this::getContent).collect(Collectors.toList());
+    }
+    
+    private String getContent(Received rec) {
+        return new String(rec.getMessage().getPayload(), Charset.forName("UTF-8"));
+    }
+    
+    private Position send(String topic, String content) {
+        Map<String, String> props = new HashMap<String, String>();
+        props.put("my", "testvalue");
+        Message message = messaging.newMessage(toBytes(content), props);
+        return messaging.send(topic, message);
+    }
+
+    private byte[] toBytes(String content) {
+        return content.getBytes(Charset.forName("UTF-8"));
+    }
+    
+}
diff --git a/pom.xml b/pom.xml
index d300cac..44c5c17 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,7 @@
 
     <modules>
         <module>org.apache.aries.events.api</module>
+        <module>org.apache.aries.events.memory</module>
     </modules>
 
     <properties>


[aries-journaled-events] 01/02: ARIES-1879 - Create Received to avoid null Position. Move positionToString to Position. Add Position.getOffset

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/aries-journaled-events.git

commit f5a98ce8253d202ea3355afd951448782770fd25
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Wed Jan 2 16:58:08 2019 +0100

    ARIES-1879 - Create Received to avoid null Position. Move positionToString to Position. Add Position.getOffset
---
 .../java/org/apache/aries/events/api/Message.java     |  5 -----
 .../java/org/apache/aries/events/api/Messaging.java   | 11 ++---------
 .../java/org/apache/aries/events/api/Position.java    |  2 +-
 .../java/org/apache/aries/events/api/Received.java    | 19 +++++++++++++++++++
 .../main/java/org/apache/aries/events/api/Type.java   |  5 +++++
 5 files changed, 27 insertions(+), 15 deletions(-)

diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
index 9dd2df3..72ad65c 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
@@ -8,10 +8,5 @@ import java.util.Map;
 public interface Message {
     byte[] getPayload();
     
-    /**
-     * Position of the message in the topic
-     * @return
-     */
-    Position getPosition();
     Map<String, String> getProperties();
 }
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
index e2f9308..1e6f168 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
@@ -11,7 +11,7 @@ public interface Messaging {
      * Send a message to a topic. When this method returns the message 
      * is safely persisted.
      */
-    void send(String topic, Message message);
+    Position send(String topic, Message message);
 
     /**
      * Subscribe to a topic. The callback is called for each message received.
@@ -22,7 +22,7 @@ public interface Messaging {
      * @param callback will be called for each message received
      * @return Returned subscription must be closed by the caller to unsubscribe
      */
-    Subscription subscribe(String topic, Position position, Seek seek, Consumer<Message> callback);
+    Subscription subscribe(String topic, Position position, Seek seek, Consumer<Received> callback);
 
     /**
      * Create a message with payload and metadata
@@ -40,11 +40,4 @@ public interface Messaging {
      */
     Position positionFromString(String position);
 
-    /**
-     * Serialize position for storage
-     * 
-     * @param position
-     * @return
-     */
-    String positionToString(Position position);
 }
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java
index 7a85039..82d33dc 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java
@@ -7,5 +7,5 @@ package org.apache.aries.events.api;
  * TODO How do we provide ordering without being too specific?
  */
 public interface Position {
-    
+    long getOffset();
 }
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Received.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Received.java
new file mode 100644
index 0000000..3538be7
--- /dev/null
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Received.java
@@ -0,0 +1,19 @@
+package org.apache.aries.events.api;
+
+public final class Received {
+    private Position position;
+    private Message message;
+    
+    public Received(Position position, Message message) {
+        this.position = position;
+        this.message = message;
+    }
+    
+    public Position getPosition() {
+        return position;
+    }
+    
+    public Message getMessage() {
+        return message;
+    }
+}
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Type.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Type.java
new file mode 100644
index 0000000..6cccebb
--- /dev/null
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Type.java
@@ -0,0 +1,5 @@
+package org.apache.aries.events.api;
+
+public @interface Type {
+    String value();
+}