You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by jg...@apache.org on 2021/06/16 13:36:21 UTC
[tomee-chatterbox] 02/06: WIP
This is an automated email from the ASF dual-hosted git repository.
jgallimore pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tomee-chatterbox.git
commit ccb9450089f0d3c6dbf39e8455c53e6eb7271fe1
Author: Jonathan Gallimore <jo...@jrg.me.uk>
AuthorDate: Wed Jun 9 11:44:40 2021 +0100
WIP
---
chatterbox-nats/chatterbox-nats-api/pom.xml | 6 ++
.../tomee/chatterbox/nats/api/InboundListener.java | 4 +-
.../tomee/chatterbox/nats/api/NATSConnection.java | 2 +-
.../tomee/chatterbox/nats/api/NATSException.java | 18 ++++
.../tomee/chatterbox/nats/api/NATSMessage.java | 23 -----
chatterbox-nats/chatterbox-nats-impl/pom.xml | 1 +
.../nats/adapter/NATSActivationSpec.java | 9 ++
.../nats/adapter/NATSResourceAdapter.java | 101 +++++++++++++++++----
.../nats/adapter/out/NATSConnectionImpl.java | 5 +-
.../nats/adapter/out/NATSManagedConnection.java | 5 +-
.../src/main/rar/META-INF/ra.xml | 14 +--
chatterbox-nats/chatterbox-nats-sample-war/pom.xml | 9 +-
.../superbiz/{SystemBean.java => EchoBean.java} | 22 ++++-
.../src/main/java/org/superbiz/Sender.java | 11 ++-
14 files changed, 166 insertions(+), 64 deletions(-)
diff --git a/chatterbox-nats/chatterbox-nats-api/pom.xml b/chatterbox-nats/chatterbox-nats-api/pom.xml
index 6f4ca62..db16e68 100644
--- a/chatterbox-nats/chatterbox-nats-api/pom.xml
+++ b/chatterbox-nats/chatterbox-nats-api/pom.xml
@@ -45,5 +45,11 @@
<artifactId>javaee-api</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>io.nats</groupId>
+ <artifactId>java-nats-streaming</artifactId>
+ <version>2.2.3</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/InboundListener.java b/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/InboundListener.java
index 5f7ea6c..94d6294 100644
--- a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/InboundListener.java
+++ b/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/InboundListener.java
@@ -16,11 +16,13 @@
*/
package org.apache.tomee.chatterbox.nats.api;
+import io.nats.streaming.Message;
+
/**
* @version $Revision$ $Date$
*/
public interface InboundListener {
- public void onMessage(final NATSMessage message) throws NATSException;
+ public void onMessage(final Message message) throws NATSException;
}
diff --git a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSConnection.java b/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSConnection.java
index e323dd1..8b39874 100755
--- a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSConnection.java
+++ b/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSConnection.java
@@ -20,7 +20,7 @@
package org.apache.tomee.chatterbox.nats.api;
public interface NATSConnection {
- public void sendMessage(final String channel, final String message);
+ public void publish(final String subject, final byte[] data) throws NATSException;
public void close();
}
diff --git a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSException.java b/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSException.java
index 031ad42..b761cd3 100644
--- a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSException.java
+++ b/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSException.java
@@ -20,4 +20,22 @@ package org.apache.tomee.chatterbox.nats.api;
* @version $Revision$ $Date$
*/
public class NATSException extends Exception {
+ public NATSException() {
+ }
+
+ public NATSException(String message) {
+ super(message);
+ }
+
+ public NATSException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public NATSException(Throwable cause) {
+ super(cause);
+ }
+
+ public NATSException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
}
diff --git a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSMessage.java b/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSMessage.java
deleted file mode 100644
index afb7718..0000000
--- a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSMessage.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tomee.chatterbox.nats.api;
-
-/**
- * @version $Revision$ $Date$
- */
-public class NATSMessage {
-}
diff --git a/chatterbox-nats/chatterbox-nats-impl/pom.xml b/chatterbox-nats/chatterbox-nats-impl/pom.xml
index fbe5f30..d46e53a 100644
--- a/chatterbox-nats/chatterbox-nats-impl/pom.xml
+++ b/chatterbox-nats/chatterbox-nats-impl/pom.xml
@@ -59,6 +59,7 @@
<groupId>io.nats</groupId>
<artifactId>java-nats-streaming</artifactId>
<version>2.2.3</version>
+ <scope>provided</scope>
</dependency>
</dependencies>
</project>
diff --git a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSActivationSpec.java b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSActivationSpec.java
index 4b62580..1a2fcc4 100644
--- a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSActivationSpec.java
+++ b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSActivationSpec.java
@@ -29,6 +29,7 @@ public class NATSActivationSpec implements ActivationSpec {
private ResourceAdapter resourceAdapter;
private Class beanClass;
+ private String subject;
public Class getBeanClass() {
return beanClass;
@@ -38,6 +39,14 @@ public class NATSActivationSpec implements ActivationSpec {
this.beanClass = beanClass;
}
+ public String getSubject() {
+ return subject;
+ }
+
+ public void setSubject(String subject) {
+ this.subject = subject;
+ }
+
@Override
public void validate() throws InvalidPropertyException {
}
diff --git a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSResourceAdapter.java b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSResourceAdapter.java
index 54d3a62..804a24f 100644
--- a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSResourceAdapter.java
+++ b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSResourceAdapter.java
@@ -16,6 +16,15 @@
*/
package org.apache.tomee.chatterbox.nats.adapter;
+import io.nats.streaming.Message;
+import io.nats.streaming.MessageHandler;
+import io.nats.streaming.Options;
+import io.nats.streaming.StreamingConnection;
+import io.nats.streaming.StreamingConnectionFactory;
+import io.nats.streaming.Subscription;
+import org.apache.tomee.chatterbox.nats.api.InboundListener;
+import org.apache.tomee.chatterbox.nats.api.NATSException;
+
import javax.resource.ResourceException;
import javax.resource.spi.ActivationSpec;
import javax.resource.spi.BootstrapContext;
@@ -28,6 +37,8 @@ import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource;
+import java.io.IOException;
+import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.lang.IllegalStateException;
@@ -36,20 +47,43 @@ import java.lang.IllegalStateException;
public class NATSResourceAdapter implements ResourceAdapter {
final Map<NATSActivationSpec, EndpointTarget> targets = new ConcurrentHashMap<NATSActivationSpec, EndpointTarget>();
+ private static final Method ONMESSAGE;
+
+ static {
+ try {
+ ONMESSAGE = InboundListener.class.getMethod("onMessage", Message.class);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@ConfigProperty
- private String token;
+ private String baseAddress;
private WorkManager workManager;
- private String user;
- private String userId;
+ private StreamingConnectionFactory cf;
+ private StreamingConnection connection;
public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
workManager = bootstrapContext.getWorkManager();
- // connect to NATS
+
+ try {
+ cf = new
+ StreamingConnectionFactory(new Options.Builder().natsUrl(baseAddress)
+ .clusterId("cluster-id").clientId("client-id").build());
+
+ connection = cf.createConnection();
+ } catch (Throwable t) {
+ // TODO: log this
+ }
}
public void stop() {
- // disconnect
+ try {
+ connection.close();
+ } catch (Throwable t) {
+ // TODO: log this
+ }
}
public void endpointActivation(final MessageEndpointFactory messageEndpointFactory, final ActivationSpec activationSpec)
@@ -64,11 +98,10 @@ public class NATSResourceAdapter implements ResourceAdapter {
final MessageEndpoint messageEndpoint = messageEndpointFactory.createEndpoint(null);
final EndpointTarget target = new EndpointTarget(messageEndpoint);
- final Class<?> endpointClass = NATSActivationSpec.getBeanClass() != null ? NATSActivationSpec
- .getBeanClass() : messageEndpointFactory.getEndpointClass();
-
-
targets.put(NATSActivationSpec, target);
+
+ final Subscription subscription = connection.subscribe(((NATSActivationSpec) activationSpec).getSubject(), target);
+ target.setSubscription(subscription);
} catch (Exception e) {
e.printStackTrace();
}
@@ -83,15 +116,14 @@ public class NATSResourceAdapter implements ResourceAdapter {
}
public void endpointDeactivation(MessageEndpointFactory messageEndpointFactory, ActivationSpec activationSpec) {
- final NATSActivationSpec telnetActivationSpec = (NATSActivationSpec) activationSpec;
+ final NATSActivationSpec natsActivationSpec = (NATSActivationSpec) activationSpec;
- final EndpointTarget endpointTarget = targets.get(telnetActivationSpec);
+ final EndpointTarget endpointTarget = targets.get(natsActivationSpec);
if (endpointTarget == null) {
throw new IllegalStateException("No EndpointTarget to undeploy for ActivationSpec " + activationSpec);
}
- // unsubscribe
-
+ endpointTarget.close();
endpointTarget.messageEndpoint.release();
}
@@ -99,17 +131,54 @@ public class NATSResourceAdapter implements ResourceAdapter {
return new XAResource[0];
}
- public void sendMessage(final String channel, final String message) {
+ public void publish(final String subject, final byte[] data) throws NATSException {
// publish a message
+ try {
+ connection.publish(subject, data);
+ } catch (Exception e) {
+ throw new NATSException(e);
+ }
}
- private static class EndpointTarget {
+ private static class EndpointTarget implements MessageHandler {
private final MessageEndpoint messageEndpoint;
+ private Subscription subscription;
- public EndpointTarget(MessageEndpoint messageEndpoint) {
+ public EndpointTarget(final MessageEndpoint messageEndpoint) {
this.messageEndpoint = messageEndpoint;
}
+ @Override
+ public void onMessage(final Message msg) {
+ try {
+ try {
+ messageEndpoint.beforeDelivery(ONMESSAGE);
+ ((InboundListener) messageEndpoint).onMessage(msg);
+ } finally {
+ messageEndpoint.afterDelivery();
+ }
+ } catch (Throwable t) {
+ // TODO: log this
+ }
+ }
+
+ public void setSubscription(final Subscription subscription) {
+ this.subscription = subscription;
+ }
+
+ public Subscription getSubscription() {
+ return subscription;
+ }
+
+ public void close() {
+ try {
+ if (subscription != null) {
+ subscription.close(true);
+ }
+ } catch (IOException e) {
+ // TODO: log this
+ }
+ }
}
}
diff --git a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSConnectionImpl.java b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSConnectionImpl.java
index 8b375f7..d743b9f 100755
--- a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSConnectionImpl.java
+++ b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSConnectionImpl.java
@@ -19,6 +19,7 @@
package org.apache.tomee.chatterbox.nats.adapter.out;
import org.apache.tomee.chatterbox.nats.api.NATSConnection;
+import org.apache.tomee.chatterbox.nats.api.NATSException;
import java.util.logging.Logger;
@@ -34,8 +35,8 @@ public class NATSConnectionImpl implements NATSConnection {
this.mcf = mcf;
}
- public void sendMessage(final String channel, final String message) {
- mc.sendMessage(channel, message);
+ public void publish(final String subject, final byte[] data) throws NATSException {
+ mc.publish(subject, data);
}
public void close() {
diff --git a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSManagedConnection.java b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSManagedConnection.java
index 0999036..d7043b2 100755
--- a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSManagedConnection.java
+++ b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSManagedConnection.java
@@ -20,6 +20,7 @@ package org.apache.tomee.chatterbox.nats.adapter.out;
import org.apache.tomee.chatterbox.nats.adapter.NATSResourceAdapter;
import org.apache.tomee.chatterbox.nats.api.NATSConnection;
+import org.apache.tomee.chatterbox.nats.api.NATSException;
import javax.resource.NotSupportedException;
import javax.resource.ResourceException;
@@ -131,10 +132,10 @@ public class NATSManagedConnection implements ManagedConnection {
return new NATSManagedConnectionMetaData();
}
- void sendMessage(final String channel, final String message) {
+ void publish(final String subject, final byte[] data) throws NATSException {
log.finest("sendMessage()");
final NATSResourceAdapter resourceAdapter = (NATSResourceAdapter) mcf.getResourceAdapter();
- resourceAdapter.sendMessage(channel, message);
+ resourceAdapter.publish(subject, data);
}
}
diff --git a/chatterbox-nats/chatterbox-nats-rar/src/main/rar/META-INF/ra.xml b/chatterbox-nats/chatterbox-nats-rar/src/main/rar/META-INF/ra.xml
index c44905f..3d4ed8e 100644
--- a/chatterbox-nats/chatterbox-nats-rar/src/main/rar/META-INF/ra.xml
+++ b/chatterbox-nats/chatterbox-nats-rar/src/main/rar/META-INF/ra.xml
@@ -22,9 +22,9 @@
xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/connector_1_7.xsd"
version="1.7">
- <description>Chatterbox Slack Connector</description>
- <display-name>Chatterbox Slack Connector</display-name>
- <eis-type>Slack Connector</eis-type>
+ <description>Chatterbox NATS Connector</description>
+ <display-name>Chatterbox NATS Connector</display-name>
+ <eis-type>NATS Connector</eis-type>
<resourceadapter-version>1.0</resourceadapter-version>
<license>
<license-required>false</license-required>
@@ -32,18 +32,18 @@
<resourceadapter>
<resourceadapter-class>org.apache.tomee.chatterbox.nats.adapter.NATSResourceAdapter</resourceadapter-class>
<config-property>
- <config-property-name>token</config-property-name>
+ <config-property-name>baseAddress</config-property-name>
<config-property-type>String</config-property-type>
</config-property>
<outbound-resourceadapter>
<connection-definition>
<managedconnectionfactory-class>org.apache.tomee.chatterbox.nats.adapter.out.NATSManagedConnectionFactory
</managedconnectionfactory-class>
- <connectionfactory-interface>SlackConnectionFactory
+ <connectionfactory-interface>org.apache.tomee.chatterbox.nats.api.NATSConnectionFactory
</connectionfactory-interface>
<connectionfactory-impl-class>org.apache.tomee.chatterbox.nats.adapter.out.NATSConnectionFactoryImpl
</connectionfactory-impl-class>
- <connection-interface>SlackConnection</connection-interface>
+ <connection-interface>org.apache.tomee.chatterbox.nats.api.NATSConnection</connection-interface>
<connection-impl-class>org.apache.tomee.chatterbox.nats.adapter.out.NATSConnectionImpl</connection-impl-class>
</connection-definition>
<transaction-support>NoTransaction</transaction-support>
@@ -52,7 +52,7 @@
<inbound-resourceadapter>
<messageadapter>
<messagelistener>
- <messagelistener-type>InboundListener</messagelistener-type>
+ <messagelistener-type>org.apache.tomee.chatterbox.nats.api.InboundListener</messagelistener-type>
<activationspec>
<activationspec-class>org.apache.tomee.chatterbox.nats.adapter.NATSActivationSpec</activationspec-class>
</activationspec>
diff --git a/chatterbox-nats/chatterbox-nats-sample-war/pom.xml b/chatterbox-nats/chatterbox-nats-sample-war/pom.xml
index aae7a99..25827a3 100644
--- a/chatterbox-nats/chatterbox-nats-sample-war/pom.xml
+++ b/chatterbox-nats/chatterbox-nats-sample-war/pom.xml
@@ -40,6 +40,12 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>io.nats</groupId>
+ <artifactId>java-nats-streaming</artifactId>
+ <version>2.2.3</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>${project.groupId}</groupId>
<artifactId>chatterbox-nats-impl</artifactId>
<version>${project.version}</version>
@@ -89,8 +95,7 @@
<tomeeClassifier>plus</tomeeClassifier>
<libs>
<lib>org.apache.tomee.chatterbox:chatterbox-nats-api:${project.version}:jar</lib>
- <lib>org.tomitribe:tomitribe-util:1.1.0:jar</lib>
- <lib>org.tomitribe:tomitribe-crest-api:0.3:jar</lib>
+ <lib>io.nats:java-nats-streaming:2.2.3:jar</lib>
</libs>
<apps>
<app>org.apache.tomee.chatterbox:chatterbox-nats-rar:${project.version}:rar</app>
diff --git a/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/SystemBean.java b/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/EchoBean.java
similarity index 60%
rename from chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/SystemBean.java
rename to chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/EchoBean.java
index 623b3f4..2c0e81a 100644
--- a/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/SystemBean.java
+++ b/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/EchoBean.java
@@ -18,16 +18,28 @@ package org.superbiz;
import org.apache.tomee.chatterbox.nats.api.InboundListener;
import org.apache.tomee.chatterbox.nats.api.NATSException;
-import org.apache.tomee.chatterbox.nats.api.NATSMessage;
+import io.nats.streaming.Message;
+import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
-@MessageDriven(name = "Receiver")
-public class SystemBean implements InboundListener {
+@MessageDriven(name = "Echo", activationConfig = {
+ @ActivationConfigProperty(propertyName = "subject", propertyValue = "echo")
+})
+public class EchoBean implements InboundListener {
@Override
- public void onMessage(NATSMessage message) throws NATSException {
- // TODO: fill in implementation here
+ public void onMessage(final Message message) throws NATSException {
+ try {
+ final String text = new String(message.getData(), StandardCharsets.UTF_8);
+ System.out.println(text);
+
+ message.ack();
+ } catch (IOException e) {
+ throw new NATSException(e);
+ }
}
}
diff --git a/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/Sender.java b/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/Sender.java
index 7ea3778..fa50cbc 100644
--- a/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/Sender.java
+++ b/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/Sender.java
@@ -29,6 +29,7 @@ import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.core.MediaType;
+import java.nio.charset.StandardCharsets;
@Singleton
@Lock(LockType.READ)
@@ -38,16 +39,16 @@ public class Sender {
@Resource
private NATSConnectionFactory cf;
- @Path("{channel}")
+ @Path("{subject}")
@POST
@Consumes(MediaType.TEXT_PLAIN)
- public void sendMessage(@PathParam("channel") final String channel, final String message) {
+ public void sendMessage(@PathParam("subject") final String subject, final String message) {
try {
final NATSConnection connection = cf.getConnection();
- connection.sendMessage(channel, message);
+ connection.publish(subject, message.getBytes(StandardCharsets.UTF_8));
connection.close();
- } catch (ResourceException e) {
- // ignore
+ } catch (Throwable t) {
+ // TODO: log this
}
}