You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2019/01/08 13:15:00 UTC

[aries-journaled-events] branch ARIES-1882 created (now 8423d35)

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a change to branch ARIES-1882
in repository https://gitbox.apache.org/repos/asf/aries-journaled-events.git.


      at 8423d35  ARIES-1882 - More scalable API

This branch includes the following new commits:

     new 8423d35  ARIES-1882 - More scalable API

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[aries-journaled-events] 01/01: ARIES-1882 - More scalable API

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch ARIES-1882
in repository https://gitbox.apache.org/repos/asf/aries-journaled-events.git

commit 8423d35fe016ceba5f5fb20ab5038a5105246e5a
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Tue Jan 8 14:14:33 2019 +0100

    ARIES-1882 - More scalable API
---
 .../java/org/apache/aries/events/api/Message.java  |  4 ++++
 .../org/apache/aries/events/api/Messaging.java     |  2 +-
 .../java/org/apache/aries/events/api/Position.java | 12 +++++++-----
 .../aries/events/api/SubscribeRequestBuilder.java  | 21 ++++++++++++++++-----
 .../org/apache/aries/events/api/TopicPosition.java |  5 +++++
 .../aries/events/memory/InMemoryMessaging.java     |  6 +++---
 .../apache/aries/events/memory/MemoryPosition.java |  8 ++++----
 .../aries/events/memory/MemoryTopicPosition.java   | 22 ++++++++++++++++++++++
 .../java/org/apache/aries/events/memory/Topic.java | 13 ++++++-------
 .../apache/aries/events/memory/MessagingTest.java  |  9 ++++-----
 .../apache/aries/events/mongo/MongoMessaging.java  |  3 ++-
 .../apache/aries/events/mongo/MongoPosition.java   | 16 +++++++++++-----
 12 files changed, 85 insertions(+), 36 deletions(-)

diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
index 529c980..9be6ddb 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
@@ -24,8 +24,12 @@ import static java.util.Collections.unmodifiableMap;
 
 /**
  * TODO If we allow wild card consumption then a message also needs a topic
+ * 
+ * The property key "key" is a special property. For systems that support sharding
+ * the key can be used to indirectly select the partition to be used.
  */
 public final class Message {
+    public static final String KEY = "key";
 
     private final byte[] payload;
     private final Map<String, String> properties;
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
index a88954c..f245228 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
@@ -46,6 +46,6 @@ public interface Messaging {
      * @param position
      * @return
      */
-    Position positionFromString(String position);
+    TopicPosition positionFromString(String position);
 
 }
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java
index e211638..0873bdf 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java
@@ -18,18 +18,20 @@
 package org.apache.aries.events.api;
 
 /**
- * Position in a the topic.
- * E.g. For a kafka implementation this would be a list of (partition, offset) as we do not support partitions 
- * this could simply be like an offset.
+ * Position of a message in a topic.
  *
- * The {@code Position} positions are ordered. The relative order between
+ * The positions are ordered. The relative order between
  * two positions can be computed by invoking {@code Comparable#compareTo}.
  * Comparing this position with a specified position will return a negative
  * integer, zero, or a positive integer as this position happened before,
  * happened concurrently, or happened after the specified position.
+ * 
+ * In systems that use sharding (like Apache Kafka) positions are only comparable 
+ * if they have the same key. 
  */
 public interface Position extends Comparable<Position> {
+    public static final String defaultKey = "default";
 
-    String positionToString();
+    String getKey();
 
 }
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequestBuilder.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequestBuilder.java
index 71cf838..5c8ebb0 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequestBuilder.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequestBuilder.java
@@ -1,6 +1,6 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements. See the NOTICE file
+ * Licensed to the Apache Software Foundation (ASF) under one
  * distributed with this work for additional information
  * regarding copyright ownership. The SF licenses this file
  * to you under the Apache License, Version 2.0 (the
@@ -49,7 +49,7 @@ public final class SubscribeRequestBuilder {
      * @param position in the topic to start consuming from
      * @return the updated subscribe request
      */
-    public SubscribeRequestBuilder startAt(Position position) {
+    public SubscribeRequestBuilder startAt(TopicPosition position) {
         this.subscribeRequest.position = position;
         return this;
     }
@@ -66,6 +66,11 @@ public final class SubscribeRequestBuilder {
         this.subscribeRequest.seek = requireNonNull(seek, "Seek must not be null");
         return this;
     }
+    
+    public SubscribeRequestBuilder groupId(String groupId) {
+        this.subscribeRequest.groupId = groupId;
+        return this;
+    }
 
     
     public SubscribeRequest build() {
@@ -75,9 +80,10 @@ public final class SubscribeRequestBuilder {
     public static class SubscribeRequest {
         private final String topic;
         private final Consumer<Received> callback;
-        private Position position;
+        private TopicPosition position;
         private Seek seek = Seek.latest;
-        
+        private String groupId;
+
         private SubscribeRequest(String topic, Consumer<Received> callback) {
             this.topic = topic;
             this.callback = callback;
@@ -87,7 +93,7 @@ public final class SubscribeRequestBuilder {
             return topic;
         }
         
-        public Position getPosition() {
+        public TopicPosition getPosition() {
             return position;
         }
         
@@ -98,5 +104,10 @@ public final class SubscribeRequestBuilder {
         public Consumer<Received> getCallback() {
             return callback;
         }
+        
+        public String getGroupId() {
+            return groupId;
+        }
     }
 }
+
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/TopicPosition.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/TopicPosition.java
new file mode 100644
index 0000000..9f00a2f
--- /dev/null
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/TopicPosition.java
@@ -0,0 +1,5 @@
+package org.apache.aries.events.api;
+
+public interface TopicPosition {
+    String topicPositionToString();
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
index 674864a..40f3cf8 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
@@ -22,10 +22,10 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.aries.events.api.Message;
 import org.apache.aries.events.api.Messaging;
-import org.apache.aries.events.api.Position;
 import org.apache.aries.events.api.SubscribeRequestBuilder;
 import org.apache.aries.events.api.SubscribeRequestBuilder.SubscribeRequest;
 import org.apache.aries.events.api.Subscription;
+import org.apache.aries.events.api.TopicPosition;
 import org.apache.aries.events.api.Type;
 import org.osgi.service.component.annotations.Component;
 
@@ -58,9 +58,9 @@ public class InMemoryMessaging implements Messaging {
     }
 
     @Override
-    public Position positionFromString(String position) {
+    public TopicPosition positionFromString(String position) {
         long offset = Long.parseLong(position);
-        return new MemoryPosition(offset);
+        return new MemoryTopicPosition(offset);
     }
 
     private Topic getOrCreate(String topicName) {
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
index fab1bea..bcc8f17 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
@@ -32,12 +32,12 @@ class MemoryPosition implements Position {
     }
 
     @Override
-    public String positionToString() {
-        return Long.toString(offset);
+    public int compareTo(Position p) {
+        return Long.compare(offset, ((MemoryPosition)p).offset);
     }
 
     @Override
-    public int compareTo(Position p) {
-        return Long.compare(offset, ((MemoryPosition)p).offset);
+    public String getKey() {
+        return "default";
     }
 }
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryTopicPosition.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryTopicPosition.java
new file mode 100644
index 0000000..1a1da40
--- /dev/null
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryTopicPosition.java
@@ -0,0 +1,22 @@
+package org.apache.aries.events.memory;
+
+import org.apache.aries.events.api.TopicPosition;
+
+public class MemoryTopicPosition implements TopicPosition {
+
+    private long offset;
+    
+    public MemoryTopicPosition(long offset) {
+        this.offset = offset;
+    }
+
+    @Override
+    public String topicPositionToString() {
+            return Long.toString(offset);
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
index 91a6d71..242f39e 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
@@ -23,7 +23,6 @@ import java.util.concurrent.Executors;
 import java.util.function.Consumer;
 
 import org.apache.aries.events.api.Message;
-import org.apache.aries.events.api.Position;
 import org.apache.aries.events.api.Received;
 import org.apache.aries.events.api.Seek;
 import org.apache.aries.events.api.SubscribeRequestBuilder.SubscribeRequest;
@@ -42,19 +41,18 @@ class Topic {
         this.journal = new Journal<>(keepAtLeast);
     }
 
-    public synchronized Position send(Message message) {
-        long offset = this.journal.append(message);
+    public synchronized void send(Message message) {
+        this.journal.append(message);
         notifyAll();
-        return new MemoryPosition(offset);
     }
 
     public Subscription subscribe(SubscribeRequest request) {
-        long startOffset = getStartOffset((MemoryPosition) request.getPosition(), request.getSeek());
+        long startOffset = getStartOffset((MemoryTopicPosition) request.getPosition(), request.getSeek());
         log.debug("Consuming from " + startOffset);
         return new TopicSubscription(startOffset, request.getCallback());
     }
 
-    private long getStartOffset(MemoryPosition position, Seek seek) {
+    private long getStartOffset(MemoryTopicPosition position, Seek seek) {
         if (position != null) {
             return position.getOffset();
         } else {
@@ -105,8 +103,9 @@ class Topic {
         private void handleMessage(Entry<Long, Message> entry) {
             long offset = entry.getKey();
             try {
+                Message message = entry.getValue();
                 MemoryPosition position = new MemoryPosition(this.currentOffset);
-                Received received = new Received(position, entry.getValue());
+                Received received = new Received(position, message);
                 callback.accept(received);
             } catch (Exception e) {
                 log.warn(e.getMessage(), e);
diff --git a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
index 5edee5d..55d0fa7 100644
--- a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
+++ b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
@@ -23,7 +23,6 @@ import java.util.stream.Collectors;
 
 import org.apache.aries.events.api.Message;
 import org.apache.aries.events.api.Messaging;
-import org.apache.aries.events.api.Position;
 import org.apache.aries.events.api.Received;
 import org.apache.aries.events.api.Seek;
 import org.apache.aries.events.api.SubscribeRequestBuilder;
@@ -63,9 +62,9 @@ public class MessagingTest {
     
     @Test
     public void testPositionFromString() {
-        Position pos = messaging.positionFromString("1");
-        assertThat(pos.compareTo(new MemoryPosition(1)), equalTo(0));
-        assertThat(pos.positionToString(), equalTo("1"));
+        MemoryTopicPosition pos = (MemoryTopicPosition) messaging.positionFromString("1");
+        assertThat(pos.getOffset(), equalTo(1l));
+        assertThat(pos.topicPositionToString(), equalTo("1"));
     }
     
     @Test
@@ -134,7 +133,7 @@ public class MessagingTest {
     public void testFrom1() {
         send("test", "testcontent");
         send("test", "testcontent2");
-        subscribe(to("test", callback).startAt(new MemoryPosition(1l)).seek(Seek.earliest));
+        subscribe(to("test", callback).startAt(new MemoryTopicPosition(1l)).seek(Seek.earliest));
         assertMessages(1);
         assertThat(messageContents(), contains("testcontent2"));
     }
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoMessaging.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoMessaging.java
index 454efde..f1fa6ca 100644
--- a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoMessaging.java
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoMessaging.java
@@ -28,6 +28,7 @@ import org.apache.aries.events.api.Position;
 import org.apache.aries.events.api.SubscribeRequestBuilder;
 import org.apache.aries.events.api.SubscribeRequestBuilder.SubscribeRequest;
 import org.apache.aries.events.api.Subscription;
+import org.apache.aries.events.api.TopicPosition;
 import org.bson.Document;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -64,7 +65,7 @@ public class MongoMessaging implements Messaging {
     }
 
     @Override
-    public Position positionFromString(String position) {
+    public TopicPosition positionFromString(String position) {
         long index = Long.parseLong(position);
         return position(index);
     }
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoPosition.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoPosition.java
index 910b15a..529e444 100644
--- a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoPosition.java
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoPosition.java
@@ -19,23 +19,29 @@
 package org.apache.aries.events.mongo;
 
 import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.TopicPosition;
 
-class MongoPosition implements Position {
+class MongoPosition implements Position, TopicPosition {
 
-    static Position position(long index) {
+    static MongoPosition position(long index) {
         return new MongoPosition(index);
     }
 
-    static long index(Position position) {
+    static long index(TopicPosition position) {
         return ((MongoPosition) position).index;
     }
-
+    
     @Override
-    public String positionToString() {
+    public String topicPositionToString() {
         return String.valueOf(index);
     }
 
     @Override
+    public String getKey() {
+        return Position.defaultKey;
+    }
+
+    @Override
     public int compareTo(Position o) {
         long thatIndex = ((MongoPosition) o).index;
         if (this.index > thatIndex) return 1;