You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2019/01/08 13:15:01 UTC
[aries-journaled-events] 01/01: ARIES-1882 - More scalable API
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch ARIES-1882
in repository https://gitbox.apache.org/repos/asf/aries-journaled-events.git
commit 8423d35fe016ceba5f5fb20ab5038a5105246e5a
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Tue Jan 8 14:14:33 2019 +0100
ARIES-1882 - More scalable API
---
.../java/org/apache/aries/events/api/Message.java | 4 ++++
.../org/apache/aries/events/api/Messaging.java | 2 +-
.../java/org/apache/aries/events/api/Position.java | 12 +++++++-----
.../aries/events/api/SubscribeRequestBuilder.java | 21 ++++++++++++++++-----
.../org/apache/aries/events/api/TopicPosition.java | 5 +++++
.../aries/events/memory/InMemoryMessaging.java | 6 +++---
.../apache/aries/events/memory/MemoryPosition.java | 8 ++++----
.../aries/events/memory/MemoryTopicPosition.java | 22 ++++++++++++++++++++++
.../java/org/apache/aries/events/memory/Topic.java | 13 ++++++-------
.../apache/aries/events/memory/MessagingTest.java | 9 ++++-----
.../apache/aries/events/mongo/MongoMessaging.java | 3 ++-
.../apache/aries/events/mongo/MongoPosition.java | 16 +++++++++++-----
12 files changed, 85 insertions(+), 36 deletions(-)
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
index 529c980..9be6ddb 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
@@ -24,8 +24,12 @@ import static java.util.Collections.unmodifiableMap;
/**
* TODO If we allow wild card consumption then a message also needs a topic
+ *
+ * The property key "key" is a special property. For systems that support sharding
+ * the key can be used to indirectly select the partition to be used.
*/
public final class Message {
+ public static final String KEY = "key";
private final byte[] payload;
private final Map<String, String> properties;
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
index a88954c..f245228 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
@@ -46,6 +46,6 @@ public interface Messaging {
* @param position
* @return
*/
- Position positionFromString(String position);
+ TopicPosition positionFromString(String position);
}
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java
index e211638..0873bdf 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java
@@ -18,18 +18,20 @@
package org.apache.aries.events.api;
/**
- * Position in a the topic.
- * E.g. For a kafka implementation this would be a list of (partition, offset) as we do not support partitions
- * this could simply be like an offset.
+ * Position of a message in a topic.
*
- * The {@code Position} positions are ordered. The relative order between
+ * The positions are ordered. The relative order between
* two positions can be computed by invoking {@code Comparable#compareTo}.
* Comparing this position with a specified position will return a negative
* integer, zero, or a positive integer as this position happened before,
* happened concurrently, or happened after the specified position.
+ *
+ * In systems that use sharding (like Apache Kafka) positions are only comparable
+ * if they have the same key.
*/
public interface Position extends Comparable<Position> {
+ public static final String defaultKey = "default";
- String positionToString();
+ String getKey();
}
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequestBuilder.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequestBuilder.java
index 71cf838..5c8ebb0 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequestBuilder.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequestBuilder.java
@@ -1,6 +1,6 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
+ * Licensed to the Apache Software Foundation (ASF) under one
* distributed with this work for additional information
* regarding copyright ownership. The SF licenses this file
* to you under the Apache License, Version 2.0 (the
@@ -49,7 +49,7 @@ public final class SubscribeRequestBuilder {
* @param position in the topic to start consuming from
* @return the updated subscribe request
*/
- public SubscribeRequestBuilder startAt(Position position) {
+ public SubscribeRequestBuilder startAt(TopicPosition position) {
this.subscribeRequest.position = position;
return this;
}
@@ -66,6 +66,11 @@ public final class SubscribeRequestBuilder {
this.subscribeRequest.seek = requireNonNull(seek, "Seek must not be null");
return this;
}
+
+ public SubscribeRequestBuilder groupId(String groupId) {
+ this.subscribeRequest.groupId = groupId;
+ return this;
+ }
public SubscribeRequest build() {
@@ -75,9 +80,10 @@ public final class SubscribeRequestBuilder {
public static class SubscribeRequest {
private final String topic;
private final Consumer<Received> callback;
- private Position position;
+ private TopicPosition position;
private Seek seek = Seek.latest;
-
+ private String groupId;
+
private SubscribeRequest(String topic, Consumer<Received> callback) {
this.topic = topic;
this.callback = callback;
@@ -87,7 +93,7 @@ public final class SubscribeRequestBuilder {
return topic;
}
- public Position getPosition() {
+ public TopicPosition getPosition() {
return position;
}
@@ -98,5 +104,10 @@ public final class SubscribeRequestBuilder {
public Consumer<Received> getCallback() {
return callback;
}
+
+ public String getGroupId() {
+ return groupId;
+ }
}
}
+
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/TopicPosition.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/TopicPosition.java
new file mode 100644
index 0000000..9f00a2f
--- /dev/null
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/TopicPosition.java
@@ -0,0 +1,5 @@
+package org.apache.aries.events.api;
+
+public interface TopicPosition {
+ String topicPositionToString();
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
index 674864a..40f3cf8 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
@@ -22,10 +22,10 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.aries.events.api.Message;
import org.apache.aries.events.api.Messaging;
-import org.apache.aries.events.api.Position;
import org.apache.aries.events.api.SubscribeRequestBuilder;
import org.apache.aries.events.api.SubscribeRequestBuilder.SubscribeRequest;
import org.apache.aries.events.api.Subscription;
+import org.apache.aries.events.api.TopicPosition;
import org.apache.aries.events.api.Type;
import org.osgi.service.component.annotations.Component;
@@ -58,9 +58,9 @@ public class InMemoryMessaging implements Messaging {
}
@Override
- public Position positionFromString(String position) {
+ public TopicPosition positionFromString(String position) {
long offset = Long.parseLong(position);
- return new MemoryPosition(offset);
+ return new MemoryTopicPosition(offset);
}
private Topic getOrCreate(String topicName) {
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
index fab1bea..bcc8f17 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
@@ -32,12 +32,12 @@ class MemoryPosition implements Position {
}
@Override
- public String positionToString() {
- return Long.toString(offset);
+ public int compareTo(Position p) {
+ return Long.compare(offset, ((MemoryPosition)p).offset);
}
@Override
- public int compareTo(Position p) {
- return Long.compare(offset, ((MemoryPosition)p).offset);
+ public String getKey() {
+ return "default";
}
}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryTopicPosition.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryTopicPosition.java
new file mode 100644
index 0000000..1a1da40
--- /dev/null
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryTopicPosition.java
@@ -0,0 +1,22 @@
+package org.apache.aries.events.memory;
+
+import org.apache.aries.events.api.TopicPosition;
+
+public class MemoryTopicPosition implements TopicPosition {
+
+ private long offset;
+
+ public MemoryTopicPosition(long offset) {
+ this.offset = offset;
+ }
+
+ @Override
+ public String topicPositionToString() {
+ return Long.toString(offset);
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
index 91a6d71..242f39e 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
@@ -23,7 +23,6 @@ import java.util.concurrent.Executors;
import java.util.function.Consumer;
import org.apache.aries.events.api.Message;
-import org.apache.aries.events.api.Position;
import org.apache.aries.events.api.Received;
import org.apache.aries.events.api.Seek;
import org.apache.aries.events.api.SubscribeRequestBuilder.SubscribeRequest;
@@ -42,19 +41,18 @@ class Topic {
this.journal = new Journal<>(keepAtLeast);
}
- public synchronized Position send(Message message) {
- long offset = this.journal.append(message);
+ public synchronized void send(Message message) {
+ this.journal.append(message);
notifyAll();
- return new MemoryPosition(offset);
}
public Subscription subscribe(SubscribeRequest request) {
- long startOffset = getStartOffset((MemoryPosition) request.getPosition(), request.getSeek());
+ long startOffset = getStartOffset((MemoryTopicPosition) request.getPosition(), request.getSeek());
log.debug("Consuming from " + startOffset);
return new TopicSubscription(startOffset, request.getCallback());
}
- private long getStartOffset(MemoryPosition position, Seek seek) {
+ private long getStartOffset(MemoryTopicPosition position, Seek seek) {
if (position != null) {
return position.getOffset();
} else {
@@ -105,8 +103,9 @@ class Topic {
private void handleMessage(Entry<Long, Message> entry) {
long offset = entry.getKey();
try {
+ Message message = entry.getValue();
MemoryPosition position = new MemoryPosition(this.currentOffset);
- Received received = new Received(position, entry.getValue());
+ Received received = new Received(position, message);
callback.accept(received);
} catch (Exception e) {
log.warn(e.getMessage(), e);
diff --git a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
index 5edee5d..55d0fa7 100644
--- a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
+++ b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
@@ -23,7 +23,6 @@ import java.util.stream.Collectors;
import org.apache.aries.events.api.Message;
import org.apache.aries.events.api.Messaging;
-import org.apache.aries.events.api.Position;
import org.apache.aries.events.api.Received;
import org.apache.aries.events.api.Seek;
import org.apache.aries.events.api.SubscribeRequestBuilder;
@@ -63,9 +62,9 @@ public class MessagingTest {
@Test
public void testPositionFromString() {
- Position pos = messaging.positionFromString("1");
- assertThat(pos.compareTo(new MemoryPosition(1)), equalTo(0));
- assertThat(pos.positionToString(), equalTo("1"));
+ MemoryTopicPosition pos = (MemoryTopicPosition) messaging.positionFromString("1");
+ assertThat(pos.getOffset(), equalTo(1l));
+ assertThat(pos.topicPositionToString(), equalTo("1"));
}
@Test
@@ -134,7 +133,7 @@ public class MessagingTest {
public void testFrom1() {
send("test", "testcontent");
send("test", "testcontent2");
- subscribe(to("test", callback).startAt(new MemoryPosition(1l)).seek(Seek.earliest));
+ subscribe(to("test", callback).startAt(new MemoryTopicPosition(1l)).seek(Seek.earliest));
assertMessages(1);
assertThat(messageContents(), contains("testcontent2"));
}
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoMessaging.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoMessaging.java
index 454efde..f1fa6ca 100644
--- a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoMessaging.java
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoMessaging.java
@@ -28,6 +28,7 @@ import org.apache.aries.events.api.Position;
import org.apache.aries.events.api.SubscribeRequestBuilder;
import org.apache.aries.events.api.SubscribeRequestBuilder.SubscribeRequest;
import org.apache.aries.events.api.Subscription;
+import org.apache.aries.events.api.TopicPosition;
import org.bson.Document;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@@ -64,7 +65,7 @@ public class MongoMessaging implements Messaging {
}
@Override
- public Position positionFromString(String position) {
+ public TopicPosition positionFromString(String position) {
long index = Long.parseLong(position);
return position(index);
}
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoPosition.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoPosition.java
index 910b15a..529e444 100644
--- a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoPosition.java
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoPosition.java
@@ -19,23 +19,29 @@
package org.apache.aries.events.mongo;
import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.TopicPosition;
-class MongoPosition implements Position {
+class MongoPosition implements Position, TopicPosition {
- static Position position(long index) {
+ static MongoPosition position(long index) {
return new MongoPosition(index);
}
- static long index(Position position) {
+ static long index(TopicPosition position) {
return ((MongoPosition) position).index;
}
-
+
@Override
- public String positionToString() {
+ public String topicPositionToString() {
return String.valueOf(index);
}
@Override
+ public String getKey() {
+ return Position.defaultKey;
+ }
+
+ @Override
public int compareTo(Position o) {
long thatIndex = ((MongoPosition) o).index;
if (this.index > thatIndex) return 1;