You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2019/01/04 14:08:48 UTC

[aries-journaled-events] 01/01: ARIES-1880 - Use builder in subscribe

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch ARIES-1880
in repository https://gitbox.apache.org/repos/asf/aries-journaled-events.git

commit 12c21af3f7bea344f594546d74227dd1862d0eca
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Fri Jan 4 15:08:03 2019 +0100

    ARIES-1880 - Use builder in subscribe
---
 .../org/apache/aries/events/api/Messaging.java     |  3 +-
 .../apache/aries/events/api/SubscribeRequest.java  | 67 ++++++++++++++++++++++
 .../aries/events/memory/InMemoryMessaging.java     | 10 ++--
 .../java/org/apache/aries/events/memory/Topic.java | 11 ++--
 .../apache/aries/events/memory/MessagingTest.java  | 24 +++++---
 pom.xml                                            |  5 ++
 6 files changed, 97 insertions(+), 23 deletions(-)

diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
index 2cd1138..a66279d 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
@@ -18,7 +18,6 @@
 package org.apache.aries.events.api;
 
 import java.util.Map;
-import java.util.function.Consumer;
 
 /**
  * Journaled messaging API
@@ -39,7 +38,7 @@ public interface Messaging {
      * @param callback will be called for each message received
      * @return Returned subscription must be closed by the caller to unsubscribe
      */
-    Subscription subscribe(String topic, Position position, Seek seek, Consumer<Received> callback);
+    Subscription subscribe(SubscribeRequest request);
 
     /**
      * Create a message with payload and metadata
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequest.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequest.java
new file mode 100644
index 0000000..a209968
--- /dev/null
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.aries.events.api;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.function.Consumer;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+@ParametersAreNonnullByDefault
+public class SubscribeRequest {
+    private final String topic;
+    private final Consumer<Received> callback;
+    private Position position;
+    private Seek seek = Seek.latest;
+    
+    public SubscribeRequest(String topic, Consumer<Received> callback) {
+        this.topic = topic;
+        this.callback = callback;
+    }
+
+    public static SubscribeRequest to(String topic, Consumer<Received> callback) {
+        return new SubscribeRequest(topic, callback);
+    }
+    
+    public SubscribeRequest startAt(Position position) {
+        this.position = position;
+        return this;
+    }
+    
+    public SubscribeRequest seek(Seek seek) {
+        this.seek = requireNonNull(seek, "Seek must not be null");
+        return this;
+    }
+    
+    public String getTopic() {
+        return topic;
+    }
+    
+    public Position getPosition() {
+        return position;
+    }
+    
+    public Seek getSeek() {
+        return seek;
+    }
+    
+    public Consumer<Received> getCallback() {
+        return callback;
+    }
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
index 8966c16..634887b 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
@@ -19,13 +19,11 @@ package org.apache.aries.events.memory;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
 
 import org.apache.aries.events.api.Message;
 import org.apache.aries.events.api.Messaging;
 import org.apache.aries.events.api.Position;
-import org.apache.aries.events.api.Received;
-import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.SubscribeRequest;
 import org.apache.aries.events.api.Subscription;
 import org.apache.aries.events.api.Type;
 import org.osgi.service.component.annotations.Component;
@@ -42,9 +40,9 @@ public class InMemoryMessaging implements Messaging {
     }
 
     @Override
-    public Subscription subscribe(String topicName, Position position, Seek seek, Consumer<Received> callback) {
-        Topic topic = getOrCreate(topicName);
-        return topic.subscribe(position, seek, callback);
+    public Subscription subscribe(SubscribeRequest request) {
+        Topic topic = getOrCreate(request.getTopic());
+        return topic.subscribe(request);
     }
 
     @Override
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
index e9175a4..8d4755e 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
@@ -27,6 +27,7 @@ import org.apache.aries.events.api.Message;
 import org.apache.aries.events.api.Position;
 import org.apache.aries.events.api.Received;
 import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.SubscribeRequest;
 import org.apache.aries.events.api.Subscription;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,10 +48,10 @@ public class Topic {
         return new MemoryPosition(offset);
     }
 
-    public Subscription subscribe(Position position, Seek seek, Consumer<Received> callback) {
-        long startOffset = getStartOffset((MemoryPosition) position, seek);
+    public Subscription subscribe(SubscribeRequest request) {
+        long startOffset = getStartOffset((MemoryPosition) request.getPosition(), request.getSeek());
         log.debug("Consuming from " + startOffset);
-        return new TopicSubscription(startOffset, callback);
+        return new TopicSubscription(startOffset, request.getCallback());
     }
 
     private long getStartOffset(MemoryPosition position, Seek seek) {
@@ -59,10 +60,8 @@ public class Topic {
         } else {
             if (seek == Seek.earliest) {
                 return this.journal.getFirstOffset();
-            } else if (seek == Seek.latest) {
-                return this.journal.getLastOffset() + 1;
             } else {
-                throw new IllegalArgumentException("Seek must not be null");
+                return this.journal.getLastOffset() + 1;
             }
         }
     }
diff --git a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
index 62d88c0..aaf77e0 100644
--- a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
+++ b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
@@ -1,5 +1,6 @@
 package org.apache.aries.events.memory;
 
+import static org.apache.aries.events.api.SubscribeRequest.to;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertEquals;
@@ -23,6 +24,7 @@ import org.apache.aries.events.api.Messaging;
 import org.apache.aries.events.api.Position;
 import org.apache.aries.events.api.Received;
 import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.SubscribeRequest;
 import org.apache.aries.events.api.Subscription;
 import org.junit.After;
 import org.junit.Before;
@@ -64,7 +66,7 @@ public class MessagingTest {
     
     @Test
     public void testSend() {
-        subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+        subscribe(to("test", callback).seek(Seek.earliest));
         String content = "testcontent";
         send("test", content);
         verify(callback, timeout(1000)).accept(messageCaptor.capture());
@@ -75,22 +77,22 @@ public class MessagingTest {
         assertThat(received.getMessage().getProperties().get("my"), equalTo("testvalue"));
     }
     
-    @Test(expected=IllegalArgumentException.class)
+    @Test(expected=NullPointerException.class)
     public void testInvalidSubscribe() {
-        messaging.subscribe("test", null, null, callback);
+        subscribe(to("test", callback).seek(null));
     }
     
     @Test
     public void testExceptionInHandler() {
         doThrow(new RuntimeException("Expected exception")).when(callback).accept(Mockito.any(Received.class));
-        subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+        subscribe(to("test", callback));
         send("test", "testcontent");
         verify(callback, timeout(1000)).accept(messageCaptor.capture());
     }
 
     @Test
     public void testEarliestBefore() {
-        subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+        subscribe(to("test", callback).seek(Seek.earliest));
         send("test", "testcontent");
         send("test", "testcontent2");
         verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
@@ -100,7 +102,7 @@ public class MessagingTest {
     @Test
     public void testEarliestAfter() {
         send("test", "testcontent");
-        subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+        subscribe(to("test", callback).seek(Seek.earliest));
         send("test", "testcontent2");
         verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
         assertThat(messageContents(), contains("testcontent", "testcontent2"));
@@ -108,7 +110,7 @@ public class MessagingTest {
     
     @Test
     public void testLatestBefore() {
-        subscriptions.add(messaging.subscribe("test", null, Seek.latest, callback));
+        subscribe(to("test", callback));
         send("test", "testcontent");
         send("test", "testcontent2");
         verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
@@ -118,7 +120,7 @@ public class MessagingTest {
     @Test
     public void testLatest() {
         send("test", "testcontent");
-        subscriptions.add(messaging.subscribe("test", null, Seek.latest, callback));
+        subscribe(to("test", callback));
         send("test", "testcontent2");
         verify(callback, timeout(1000)).accept(messageCaptor.capture());
         assertThat(messageContents(), contains("testcontent2"));
@@ -128,10 +130,14 @@ public class MessagingTest {
     public void testFrom1() {
         send("test", "testcontent");
         send("test", "testcontent2");
-        subscriptions.add(messaging.subscribe("test", new MemoryPosition(1l), Seek.earliest, callback));
+        subscribe(to("test", callback).startAt(new MemoryPosition(1l)).seek(Seek.earliest));
         verify(callback, timeout(1000)).accept(messageCaptor.capture());
         assertThat(messageContents(), contains("testcontent2"));
     }
+    
+    private void subscribe(SubscribeRequest request) {
+        this.subscriptions.add(messaging.subscribe(request));
+    }
 
     private List<String> messageContents() {
         return messageCaptor.getAllValues().stream()
diff --git a/pom.xml b/pom.xml
index bf7e8b4..8010756 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,6 +81,11 @@
             <artifactId>slf4j-api</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+            <version>2.0.0</version>
+        </dependency>
 
         <dependency>
             <groupId>org.slf4j</groupId>