You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2019/01/04 14:08:48 UTC
[aries-journaled-events] 01/01: ARIES-1880 - Use builder in
subscribe
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch ARIES-1880
in repository https://gitbox.apache.org/repos/asf/aries-journaled-events.git
commit 12c21af3f7bea344f594546d74227dd1862d0eca
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Fri Jan 4 15:08:03 2019 +0100
ARIES-1880 - Use builder in subscribe
---
.../org/apache/aries/events/api/Messaging.java | 3 +-
.../apache/aries/events/api/SubscribeRequest.java | 67 ++++++++++++++++++++++
.../aries/events/memory/InMemoryMessaging.java | 10 ++--
.../java/org/apache/aries/events/memory/Topic.java | 11 ++--
.../apache/aries/events/memory/MessagingTest.java | 24 +++++---
pom.xml | 5 ++
6 files changed, 97 insertions(+), 23 deletions(-)
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
index 2cd1138..a66279d 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
@@ -18,7 +18,6 @@
package org.apache.aries.events.api;
import java.util.Map;
-import java.util.function.Consumer;
/**
* Journaled messaging API
@@ -39,7 +38,7 @@ public interface Messaging {
* @param callback will be called for each message received
* @return Returned subscription must be closed by the caller to unsubscribe
*/
- Subscription subscribe(String topic, Position position, Seek seek, Consumer<Received> callback);
+ Subscription subscribe(SubscribeRequest request);
/**
* Create a message with payload and metadata
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequest.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequest.java
new file mode 100644
index 0000000..a209968
--- /dev/null
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.aries.events.api;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.function.Consumer;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+@ParametersAreNonnullByDefault
+public class SubscribeRequest {
+ private final String topic;
+ private final Consumer<Received> callback;
+ private Position position;
+ private Seek seek = Seek.latest;
+
+ public SubscribeRequest(String topic, Consumer<Received> callback) {
+ this.topic = topic;
+ this.callback = callback;
+ }
+
+ public static SubscribeRequest to(String topic, Consumer<Received> callback) {
+ return new SubscribeRequest(topic, callback);
+ }
+
+ public SubscribeRequest startAt(Position position) {
+ this.position = position;
+ return this;
+ }
+
+ public SubscribeRequest seek(Seek seek) {
+ this.seek = requireNonNull(seek, "Seek must not be null");
+ return this;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public Position getPosition() {
+ return position;
+ }
+
+ public Seek getSeek() {
+ return seek;
+ }
+
+ public Consumer<Received> getCallback() {
+ return callback;
+ }
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
index 8966c16..634887b 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
@@ -19,13 +19,11 @@ package org.apache.aries.events.memory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
import org.apache.aries.events.api.Message;
import org.apache.aries.events.api.Messaging;
import org.apache.aries.events.api.Position;
-import org.apache.aries.events.api.Received;
-import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.SubscribeRequest;
import org.apache.aries.events.api.Subscription;
import org.apache.aries.events.api.Type;
import org.osgi.service.component.annotations.Component;
@@ -42,9 +40,9 @@ public class InMemoryMessaging implements Messaging {
}
@Override
- public Subscription subscribe(String topicName, Position position, Seek seek, Consumer<Received> callback) {
- Topic topic = getOrCreate(topicName);
- return topic.subscribe(position, seek, callback);
+ public Subscription subscribe(SubscribeRequest request) {
+ Topic topic = getOrCreate(request.getTopic());
+ return topic.subscribe(request);
}
@Override
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
index e9175a4..8d4755e 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
@@ -27,6 +27,7 @@ import org.apache.aries.events.api.Message;
import org.apache.aries.events.api.Position;
import org.apache.aries.events.api.Received;
import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.SubscribeRequest;
import org.apache.aries.events.api.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,10 +48,10 @@ public class Topic {
return new MemoryPosition(offset);
}
- public Subscription subscribe(Position position, Seek seek, Consumer<Received> callback) {
- long startOffset = getStartOffset((MemoryPosition) position, seek);
+ public Subscription subscribe(SubscribeRequest request) {
+ long startOffset = getStartOffset((MemoryPosition) request.getPosition(), request.getSeek());
log.debug("Consuming from " + startOffset);
- return new TopicSubscription(startOffset, callback);
+ return new TopicSubscription(startOffset, request.getCallback());
}
private long getStartOffset(MemoryPosition position, Seek seek) {
@@ -59,10 +60,8 @@ public class Topic {
} else {
if (seek == Seek.earliest) {
return this.journal.getFirstOffset();
- } else if (seek == Seek.latest) {
- return this.journal.getLastOffset() + 1;
} else {
- throw new IllegalArgumentException("Seek must not be null");
+ return this.journal.getLastOffset() + 1;
}
}
}
diff --git a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
index 62d88c0..aaf77e0 100644
--- a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
+++ b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
@@ -1,5 +1,6 @@
package org.apache.aries.events.memory;
+import static org.apache.aries.events.api.SubscribeRequest.to;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertEquals;
@@ -23,6 +24,7 @@ import org.apache.aries.events.api.Messaging;
import org.apache.aries.events.api.Position;
import org.apache.aries.events.api.Received;
import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.SubscribeRequest;
import org.apache.aries.events.api.Subscription;
import org.junit.After;
import org.junit.Before;
@@ -64,7 +66,7 @@ public class MessagingTest {
@Test
public void testSend() {
- subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+ subscribe(to("test", callback).seek(Seek.earliest));
String content = "testcontent";
send("test", content);
verify(callback, timeout(1000)).accept(messageCaptor.capture());
@@ -75,22 +77,22 @@ public class MessagingTest {
assertThat(received.getMessage().getProperties().get("my"), equalTo("testvalue"));
}
- @Test(expected=IllegalArgumentException.class)
+ @Test(expected=NullPointerException.class)
public void testInvalidSubscribe() {
- messaging.subscribe("test", null, null, callback);
+ subscribe(to("test", callback).seek(null));
}
@Test
public void testExceptionInHandler() {
doThrow(new RuntimeException("Expected exception")).when(callback).accept(Mockito.any(Received.class));
- subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+ subscribe(to("test", callback));
send("test", "testcontent");
verify(callback, timeout(1000)).accept(messageCaptor.capture());
}
@Test
public void testEarliestBefore() {
- subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+ subscribe(to("test", callback).seek(Seek.earliest));
send("test", "testcontent");
send("test", "testcontent2");
verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
@@ -100,7 +102,7 @@ public class MessagingTest {
@Test
public void testEarliestAfter() {
send("test", "testcontent");
- subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+ subscribe(to("test", callback).seek(Seek.earliest));
send("test", "testcontent2");
verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
assertThat(messageContents(), contains("testcontent", "testcontent2"));
@@ -108,7 +110,7 @@ public class MessagingTest {
@Test
public void testLatestBefore() {
- subscriptions.add(messaging.subscribe("test", null, Seek.latest, callback));
+ subscribe(to("test", callback));
send("test", "testcontent");
send("test", "testcontent2");
verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
@@ -118,7 +120,7 @@ public class MessagingTest {
@Test
public void testLatest() {
send("test", "testcontent");
- subscriptions.add(messaging.subscribe("test", null, Seek.latest, callback));
+ subscribe(to("test", callback));
send("test", "testcontent2");
verify(callback, timeout(1000)).accept(messageCaptor.capture());
assertThat(messageContents(), contains("testcontent2"));
@@ -128,10 +130,14 @@ public class MessagingTest {
public void testFrom1() {
send("test", "testcontent");
send("test", "testcontent2");
- subscriptions.add(messaging.subscribe("test", new MemoryPosition(1l), Seek.earliest, callback));
+ subscribe(to("test", callback).startAt(new MemoryPosition(1l)).seek(Seek.earliest));
verify(callback, timeout(1000)).accept(messageCaptor.capture());
assertThat(messageContents(), contains("testcontent2"));
}
+
+ private void subscribe(SubscribeRequest request) {
+ this.subscriptions.add(messaging.subscribe(request));
+ }
private List<String> messageContents() {
return messageCaptor.getAllValues().stream()
diff --git a/pom.xml b/pom.xml
index bf7e8b4..8010756 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,6 +81,11 @@
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <version>2.0.0</version>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>