You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2019/01/08 13:15:01 UTC

[aries-journaled-events] 01/01: ARIES-1882 - More scalable API

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch ARIES-1882
in repository https://gitbox.apache.org/repos/asf/aries-journaled-events.git

commit 8423d35fe016ceba5f5fb20ab5038a5105246e5a
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Tue Jan 8 14:14:33 2019 +0100

    ARIES-1882 - More scalable API
---
 .../java/org/apache/aries/events/api/Message.java  |  4 ++++
 .../org/apache/aries/events/api/Messaging.java     |  2 +-
 .../java/org/apache/aries/events/api/Position.java | 12 +++++++-----
 .../aries/events/api/SubscribeRequestBuilder.java  | 21 ++++++++++++++++-----
 .../org/apache/aries/events/api/TopicPosition.java |  5 +++++
 .../aries/events/memory/InMemoryMessaging.java     |  6 +++---
 .../apache/aries/events/memory/MemoryPosition.java |  8 ++++----
 .../aries/events/memory/MemoryTopicPosition.java   | 22 ++++++++++++++++++++++
 .../java/org/apache/aries/events/memory/Topic.java | 13 ++++++-------
 .../apache/aries/events/memory/MessagingTest.java  |  9 ++++-----
 .../apache/aries/events/mongo/MongoMessaging.java  |  3 ++-
 .../apache/aries/events/mongo/MongoPosition.java   | 16 +++++++++++-----
 12 files changed, 85 insertions(+), 36 deletions(-)

diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
index 529c980..9be6ddb 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
@@ -24,8 +24,12 @@ import static java.util.Collections.unmodifiableMap;
 
 /**
  * TODO If we allow wild card consumption then a message also needs a topic
+ * 
+ * The property key "key" is a special property. For systems that support sharding
+ * the key can be used to indirectly select the partition to be used.
  */
 public final class Message {
+    public static final String KEY = "key";
 
     private final byte[] payload;
     private final Map<String, String> properties;
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
index a88954c..f245228 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
@@ -46,6 +46,6 @@ public interface Messaging {
      * @param position
      * @return
      */
-    Position positionFromString(String position);
+    TopicPosition positionFromString(String position);
 
 }
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java
index e211638..0873bdf 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java
@@ -18,18 +18,20 @@
 package org.apache.aries.events.api;
 
 /**
- * Position in a the topic.
- * E.g. For a kafka implementation this would be a list of (partition, offset) as we do not support partitions 
- * this could simply be like an offset.
+ * Position of a message in a topic.
  *
- * The {@code Position} positions are ordered. The relative order between
+ * The positions are ordered. The relative order between
  * two positions can be computed by invoking {@code Comparable#compareTo}.
  * Comparing this position with a specified position will return a negative
  * integer, zero, or a positive integer as this position happened before,
  * happened concurrently, or happened after the specified position.
+ * 
+ * In systems that use sharding (like Apache Kafka) positions are only comparable 
+ * if they have the same key. 
  */
 public interface Position extends Comparable<Position> {
+    public static final String defaultKey = "default";
 
-    String positionToString();
+    String getKey();
 
 }
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequestBuilder.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequestBuilder.java
index 71cf838..5c8ebb0 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequestBuilder.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequestBuilder.java
@@ -1,6 +1,6 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements. See the NOTICE file
+ * Licensed to the Apache Software Foundation (ASF) under one
  * distributed with this work for additional information
  * regarding copyright ownership. The SF licenses this file
  * to you under the Apache License, Version 2.0 (the
@@ -49,7 +49,7 @@ public final class SubscribeRequestBuilder {
      * @param position in the topic to start consuming from
      * @return the updated subscribe request
      */
-    public SubscribeRequestBuilder startAt(Position position) {
+    public SubscribeRequestBuilder startAt(TopicPosition position) {
         this.subscribeRequest.position = position;
         return this;
     }
@@ -66,6 +66,11 @@ public final class SubscribeRequestBuilder {
         this.subscribeRequest.seek = requireNonNull(seek, "Seek must not be null");
         return this;
     }
+    
+    public SubscribeRequestBuilder groupId(String groupId) {
+        this.subscribeRequest.groupId = groupId;
+        return this;
+    }
 
     
     public SubscribeRequest build() {
@@ -75,9 +80,10 @@ public final class SubscribeRequestBuilder {
     public static class SubscribeRequest {
         private final String topic;
         private final Consumer<Received> callback;
-        private Position position;
+        private TopicPosition position;
         private Seek seek = Seek.latest;
-        
+        private String groupId;
+
         private SubscribeRequest(String topic, Consumer<Received> callback) {
             this.topic = topic;
             this.callback = callback;
@@ -87,7 +93,7 @@ public final class SubscribeRequestBuilder {
             return topic;
         }
         
-        public Position getPosition() {
+        public TopicPosition getPosition() {
             return position;
         }
         
@@ -98,5 +104,10 @@ public final class SubscribeRequestBuilder {
         public Consumer<Received> getCallback() {
             return callback;
         }
+        
+        public String getGroupId() {
+            return groupId;
+        }
     }
 }
+
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/TopicPosition.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/TopicPosition.java
new file mode 100644
index 0000000..9f00a2f
--- /dev/null
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/TopicPosition.java
@@ -0,0 +1,5 @@
+package org.apache.aries.events.api;
+
+public interface TopicPosition {
+    String topicPositionToString();
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
index 674864a..40f3cf8 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
@@ -22,10 +22,10 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.aries.events.api.Message;
 import org.apache.aries.events.api.Messaging;
-import org.apache.aries.events.api.Position;
 import org.apache.aries.events.api.SubscribeRequestBuilder;
 import org.apache.aries.events.api.SubscribeRequestBuilder.SubscribeRequest;
 import org.apache.aries.events.api.Subscription;
+import org.apache.aries.events.api.TopicPosition;
 import org.apache.aries.events.api.Type;
 import org.osgi.service.component.annotations.Component;
 
@@ -58,9 +58,9 @@ public class InMemoryMessaging implements Messaging {
     }
 
     @Override
-    public Position positionFromString(String position) {
+    public TopicPosition positionFromString(String position) {
         long offset = Long.parseLong(position);
-        return new MemoryPosition(offset);
+        return new MemoryTopicPosition(offset);
     }
 
     private Topic getOrCreate(String topicName) {
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
index fab1bea..bcc8f17 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
@@ -32,12 +32,12 @@ class MemoryPosition implements Position {
     }
 
     @Override
-    public String positionToString() {
-        return Long.toString(offset);
+    public int compareTo(Position p) {
+        return Long.compare(offset, ((MemoryPosition)p).offset);
     }
 
     @Override
-    public int compareTo(Position p) {
-        return Long.compare(offset, ((MemoryPosition)p).offset);
+    public String getKey() {
+        return "default";
     }
 }
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryTopicPosition.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryTopicPosition.java
new file mode 100644
index 0000000..1a1da40
--- /dev/null
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryTopicPosition.java
@@ -0,0 +1,22 @@
+package org.apache.aries.events.memory;
+
+import org.apache.aries.events.api.TopicPosition;
+
+public class MemoryTopicPosition implements TopicPosition {
+
+    private long offset;
+    
+    public MemoryTopicPosition(long offset) {
+        this.offset = offset;
+    }
+
+    @Override
+    public String topicPositionToString() {
+            return Long.toString(offset);
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
index 91a6d71..242f39e 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
@@ -23,7 +23,6 @@ import java.util.concurrent.Executors;
 import java.util.function.Consumer;
 
 import org.apache.aries.events.api.Message;
-import org.apache.aries.events.api.Position;
 import org.apache.aries.events.api.Received;
 import org.apache.aries.events.api.Seek;
 import org.apache.aries.events.api.SubscribeRequestBuilder.SubscribeRequest;
@@ -42,19 +41,18 @@ class Topic {
         this.journal = new Journal<>(keepAtLeast);
     }
 
-    public synchronized Position send(Message message) {
-        long offset = this.journal.append(message);
+    public synchronized void send(Message message) {
+        this.journal.append(message);
         notifyAll();
-        return new MemoryPosition(offset);
     }
 
     public Subscription subscribe(SubscribeRequest request) {
-        long startOffset = getStartOffset((MemoryPosition) request.getPosition(), request.getSeek());
+        long startOffset = getStartOffset((MemoryTopicPosition) request.getPosition(), request.getSeek());
         log.debug("Consuming from " + startOffset);
         return new TopicSubscription(startOffset, request.getCallback());
     }
 
-    private long getStartOffset(MemoryPosition position, Seek seek) {
+    private long getStartOffset(MemoryTopicPosition position, Seek seek) {
         if (position != null) {
             return position.getOffset();
         } else {
@@ -105,8 +103,9 @@ class Topic {
         private void handleMessage(Entry<Long, Message> entry) {
             long offset = entry.getKey();
             try {
+                Message message = entry.getValue();
                 MemoryPosition position = new MemoryPosition(this.currentOffset);
-                Received received = new Received(position, entry.getValue());
+                Received received = new Received(position, message);
                 callback.accept(received);
             } catch (Exception e) {
                 log.warn(e.getMessage(), e);
diff --git a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
index 5edee5d..55d0fa7 100644
--- a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
+++ b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
@@ -23,7 +23,6 @@ import java.util.stream.Collectors;
 
 import org.apache.aries.events.api.Message;
 import org.apache.aries.events.api.Messaging;
-import org.apache.aries.events.api.Position;
 import org.apache.aries.events.api.Received;
 import org.apache.aries.events.api.Seek;
 import org.apache.aries.events.api.SubscribeRequestBuilder;
@@ -63,9 +62,9 @@ public class MessagingTest {
     
     @Test
     public void testPositionFromString() {
-        Position pos = messaging.positionFromString("1");
-        assertThat(pos.compareTo(new MemoryPosition(1)), equalTo(0));
-        assertThat(pos.positionToString(), equalTo("1"));
+        MemoryTopicPosition pos = (MemoryTopicPosition) messaging.positionFromString("1");
+        assertThat(pos.getOffset(), equalTo(1l));
+        assertThat(pos.topicPositionToString(), equalTo("1"));
     }
     
     @Test
@@ -134,7 +133,7 @@ public class MessagingTest {
     public void testFrom1() {
         send("test", "testcontent");
         send("test", "testcontent2");
-        subscribe(to("test", callback).startAt(new MemoryPosition(1l)).seek(Seek.earliest));
+        subscribe(to("test", callback).startAt(new MemoryTopicPosition(1l)).seek(Seek.earliest));
         assertMessages(1);
         assertThat(messageContents(), contains("testcontent2"));
     }
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoMessaging.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoMessaging.java
index 454efde..f1fa6ca 100644
--- a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoMessaging.java
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoMessaging.java
@@ -28,6 +28,7 @@ import org.apache.aries.events.api.Position;
 import org.apache.aries.events.api.SubscribeRequestBuilder;
 import org.apache.aries.events.api.SubscribeRequestBuilder.SubscribeRequest;
 import org.apache.aries.events.api.Subscription;
+import org.apache.aries.events.api.TopicPosition;
 import org.bson.Document;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -64,7 +65,7 @@ public class MongoMessaging implements Messaging {
     }
 
     @Override
-    public Position positionFromString(String position) {
+    public TopicPosition positionFromString(String position) {
         long index = Long.parseLong(position);
         return position(index);
     }
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoPosition.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoPosition.java
index 910b15a..529e444 100644
--- a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoPosition.java
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoPosition.java
@@ -19,23 +19,29 @@
 package org.apache.aries.events.mongo;
 
 import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.TopicPosition;
 
-class MongoPosition implements Position {
+class MongoPosition implements Position, TopicPosition {
 
-    static Position position(long index) {
+    static MongoPosition position(long index) {
         return new MongoPosition(index);
     }
 
-    static long index(Position position) {
+    static long index(TopicPosition position) {
         return ((MongoPosition) position).index;
     }
-
+    
     @Override
-    public String positionToString() {
+    public String topicPositionToString() {
         return String.valueOf(index);
     }
 
     @Override
+    public String getKey() {
+        return Position.defaultKey;
+    }
+
+    @Override
     public int compareTo(Position o) {
         long thatIndex = ((MongoPosition) o).index;
         if (this.index > thatIndex) return 1;