You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by jg...@apache.org on 2021/06/28 21:28:54 UTC

[tomee-chatterbox] branch master updated: durableName and ackWait

This is an automated email from the ASF dual-hosted git repository.

jgallimore pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tomee-chatterbox.git


The following commit(s) were added to refs/heads/master by this push:
     new bcb14fb  durableName and ackWait
     new 159c0de  Merge pull request #1 from chongma/master
bcb14fb is described below

commit bcb14fbee0a5b63d96540f9cf3ce67fb160ea637
Author: Matthew Broadhead <nb...@gmail.com>
AuthorDate: Mon Jun 28 23:21:56 2021 +0200

    durableName and ackWait
---
 .../chatterbox/nats/adapter/NATSActivationSpec.java  | 20 +++++++++++++++++++-
 .../chatterbox/nats/adapter/NATSResourceAdapter.java |  7 ++++++-
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSActivationSpec.java b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSActivationSpec.java
index 1a2fcc4..881d69b 100644
--- a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSActivationSpec.java
+++ b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSActivationSpec.java
@@ -30,8 +30,26 @@ public class NATSActivationSpec implements ActivationSpec {
     private ResourceAdapter resourceAdapter;
     private Class beanClass;
     private String subject;
+    private String durableName;
+    private String ackWait;
 
-    public Class getBeanClass() {
+    public String getDurableName() {
+		return durableName;
+	}
+
+	public void setDurableName(String durableName) {
+		this.durableName = durableName;
+	}
+
+	public String getAckWait() {
+		return ackWait;
+	}
+
+	public void setAckWait(String ackWait) {
+		this.ackWait = ackWait;
+	}
+
+	public Class getBeanClass() {
         return beanClass;
     }
 
diff --git a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSResourceAdapter.java b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSResourceAdapter.java
index 2f52de6..89c95fd 100644
--- a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSResourceAdapter.java
+++ b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSResourceAdapter.java
@@ -22,6 +22,8 @@ import io.nats.streaming.Options;
 import io.nats.streaming.StreamingConnection;
 import io.nats.streaming.StreamingConnectionFactory;
 import io.nats.streaming.Subscription;
+import io.nats.streaming.SubscriptionOptions;
+
 import org.apache.tomee.chatterbox.nats.api.InboundListener;
 import org.apache.tomee.chatterbox.nats.api.NATSException;
 import org.apache.tomee.chatterbox.nats.api.NATSMessage;
@@ -42,6 +44,7 @@ import java.io.IOException;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.time.Duration;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.lang.IllegalStateException;
@@ -112,7 +115,9 @@ public class NATSResourceAdapter implements ResourceAdapter {
                     final EndpointTarget target = new EndpointTarget(messageEndpoint);
                     targets.put(NATSActivationSpec, target);
 
-                    final Subscription subscription = connection.subscribe(((NATSActivationSpec) activationSpec).getSubject(), target);
+                    final Subscription subscription = connection.subscribe(((NATSActivationSpec) activationSpec).getSubject(), target, 
+                    		new SubscriptionOptions.Builder().startWithLastReceived().manualAcks().ackWait(Duration.ofSeconds(Integer.parseInt(((NATSActivationSpec) activationSpec).getAckWait())))
+                            .durableName(((NATSActivationSpec) activationSpec).getDurableName()).build());
                     target.setSubscription(subscription);
                 } catch (Exception e) {
                     e.printStackTrace();