You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by jg...@apache.org on 2021/06/16 13:36:21 UTC

[tomee-chatterbox] 02/06: WIP

This is an automated email from the ASF dual-hosted git repository.

jgallimore pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tomee-chatterbox.git

commit ccb9450089f0d3c6dbf39e8455c53e6eb7271fe1
Author: Jonathan Gallimore <jo...@jrg.me.uk>
AuthorDate: Wed Jun 9 11:44:40 2021 +0100

    WIP
---
 chatterbox-nats/chatterbox-nats-api/pom.xml        |   6 ++
 .../tomee/chatterbox/nats/api/InboundListener.java |   4 +-
 .../tomee/chatterbox/nats/api/NATSConnection.java  |   2 +-
 .../tomee/chatterbox/nats/api/NATSException.java   |  18 ++++
 .../tomee/chatterbox/nats/api/NATSMessage.java     |  23 -----
 chatterbox-nats/chatterbox-nats-impl/pom.xml       |   1 +
 .../nats/adapter/NATSActivationSpec.java           |   9 ++
 .../nats/adapter/NATSResourceAdapter.java          | 101 +++++++++++++++++----
 .../nats/adapter/out/NATSConnectionImpl.java       |   5 +-
 .../nats/adapter/out/NATSManagedConnection.java    |   5 +-
 .../src/main/rar/META-INF/ra.xml                   |  14 +--
 chatterbox-nats/chatterbox-nats-sample-war/pom.xml |   9 +-
 .../superbiz/{SystemBean.java => EchoBean.java}    |  22 ++++-
 .../src/main/java/org/superbiz/Sender.java         |  11 ++-
 14 files changed, 166 insertions(+), 64 deletions(-)

diff --git a/chatterbox-nats/chatterbox-nats-api/pom.xml b/chatterbox-nats/chatterbox-nats-api/pom.xml
index 6f4ca62..db16e68 100644
--- a/chatterbox-nats/chatterbox-nats-api/pom.xml
+++ b/chatterbox-nats/chatterbox-nats-api/pom.xml
@@ -45,5 +45,11 @@
       <artifactId>javaee-api</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>io.nats</groupId>
+      <artifactId>java-nats-streaming</artifactId>
+      <version>2.2.3</version>
+      <scope>provided</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/InboundListener.java b/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/InboundListener.java
index 5f7ea6c..94d6294 100644
--- a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/InboundListener.java
+++ b/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/InboundListener.java
@@ -16,11 +16,13 @@
  */
 package org.apache.tomee.chatterbox.nats.api;
 
+import io.nats.streaming.Message;
+
 /**
  * @version $Revision$ $Date$
  */
 public interface InboundListener {
 
-    public void onMessage(final NATSMessage message) throws NATSException;
+    public void onMessage(final Message message) throws NATSException;
 
 }
diff --git a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSConnection.java b/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSConnection.java
index e323dd1..8b39874 100755
--- a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSConnection.java
+++ b/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSConnection.java
@@ -20,7 +20,7 @@
 package org.apache.tomee.chatterbox.nats.api;
 
 public interface NATSConnection {
-    public void sendMessage(final String channel, final String message);
+    public void publish(final String subject, final byte[] data) throws NATSException;
 
     public void close();
 }
diff --git a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSException.java b/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSException.java
index 031ad42..b761cd3 100644
--- a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSException.java
+++ b/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSException.java
@@ -20,4 +20,22 @@ package org.apache.tomee.chatterbox.nats.api;
  * @version $Revision$ $Date$
  */
 public class NATSException extends Exception {
+    public NATSException() {
+    }
+
+    public NATSException(String message) {
+        super(message);
+    }
+
+    public NATSException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public NATSException(Throwable cause) {
+        super(cause);
+    }
+
+    public NATSException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
 }
diff --git a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSMessage.java b/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSMessage.java
deleted file mode 100644
index afb7718..0000000
--- a/chatterbox-nats/chatterbox-nats-api/src/main/java/org/apache/tomee/chatterbox/nats/api/NATSMessage.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tomee.chatterbox.nats.api;
-
-/**
- * @version $Revision$ $Date$
- */
-public class NATSMessage {
-}
diff --git a/chatterbox-nats/chatterbox-nats-impl/pom.xml b/chatterbox-nats/chatterbox-nats-impl/pom.xml
index fbe5f30..d46e53a 100644
--- a/chatterbox-nats/chatterbox-nats-impl/pom.xml
+++ b/chatterbox-nats/chatterbox-nats-impl/pom.xml
@@ -59,6 +59,7 @@
       <groupId>io.nats</groupId>
       <artifactId>java-nats-streaming</artifactId>
       <version>2.2.3</version>
+      <scope>provided</scope>
     </dependency>
   </dependencies>
 </project>
diff --git a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSActivationSpec.java b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSActivationSpec.java
index 4b62580..1a2fcc4 100644
--- a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSActivationSpec.java
+++ b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSActivationSpec.java
@@ -29,6 +29,7 @@ public class NATSActivationSpec implements ActivationSpec {
 
     private ResourceAdapter resourceAdapter;
     private Class beanClass;
+    private String subject;
 
     public Class getBeanClass() {
         return beanClass;
@@ -38,6 +39,14 @@ public class NATSActivationSpec implements ActivationSpec {
         this.beanClass = beanClass;
     }
 
+    public String getSubject() {
+        return subject;
+    }
+
+    public void setSubject(String subject) {
+        this.subject = subject;
+    }
+
     @Override
     public void validate() throws InvalidPropertyException {
     }
diff --git a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSResourceAdapter.java b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSResourceAdapter.java
index 54d3a62..804a24f 100644
--- a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSResourceAdapter.java
+++ b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/NATSResourceAdapter.java
@@ -16,6 +16,15 @@
  */
 package org.apache.tomee.chatterbox.nats.adapter;
 
+import io.nats.streaming.Message;
+import io.nats.streaming.MessageHandler;
+import io.nats.streaming.Options;
+import io.nats.streaming.StreamingConnection;
+import io.nats.streaming.StreamingConnectionFactory;
+import io.nats.streaming.Subscription;
+import org.apache.tomee.chatterbox.nats.api.InboundListener;
+import org.apache.tomee.chatterbox.nats.api.NATSException;
+
 import javax.resource.ResourceException;
 import javax.resource.spi.ActivationSpec;
 import javax.resource.spi.BootstrapContext;
@@ -28,6 +37,8 @@ import javax.resource.spi.endpoint.MessageEndpointFactory;
 import javax.resource.spi.work.Work;
 import javax.resource.spi.work.WorkManager;
 import javax.transaction.xa.XAResource;
+import java.io.IOException;
+import java.lang.reflect.Method;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.lang.IllegalStateException;
@@ -36,20 +47,43 @@ import java.lang.IllegalStateException;
 public class NATSResourceAdapter implements ResourceAdapter {
     final Map<NATSActivationSpec, EndpointTarget> targets = new ConcurrentHashMap<NATSActivationSpec, EndpointTarget>();
 
+    private static final Method ONMESSAGE;
+
+    static {
+        try {
+            ONMESSAGE = InboundListener.class.getMethod("onMessage", Message.class);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     @ConfigProperty
-    private String token;
+    private String baseAddress;
 
     private WorkManager workManager;
-    private String user;
-    private String userId;
+    private StreamingConnectionFactory cf;
+    private StreamingConnection connection;
 
     public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
         workManager = bootstrapContext.getWorkManager();
-        // connect to NATS
+
+        try {
+            cf = new
+                    StreamingConnectionFactory(new Options.Builder().natsUrl(baseAddress)
+                    .clusterId("cluster-id").clientId("client-id").build());
+
+            connection = cf.createConnection();
+        } catch (Throwable t) {
+            // TODO: log this
+        }
     }
 
     public void stop() {
-        // disconnect
+        try {
+            connection.close();
+        } catch (Throwable t) {
+            // TODO: log this
+        }
     }
 
     public void endpointActivation(final MessageEndpointFactory messageEndpointFactory, final ActivationSpec activationSpec)
@@ -64,11 +98,10 @@ public class NATSResourceAdapter implements ResourceAdapter {
                     final MessageEndpoint messageEndpoint = messageEndpointFactory.createEndpoint(null);
 
                     final EndpointTarget target = new EndpointTarget(messageEndpoint);
-                    final Class<?> endpointClass = NATSActivationSpec.getBeanClass() != null ? NATSActivationSpec
-                            .getBeanClass() : messageEndpointFactory.getEndpointClass();
-
-
                     targets.put(NATSActivationSpec, target);
+
+                    final Subscription subscription = connection.subscribe(((NATSActivationSpec) activationSpec).getSubject(), target);
+                    target.setSubscription(subscription);
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
@@ -83,15 +116,14 @@ public class NATSResourceAdapter implements ResourceAdapter {
     }
 
     public void endpointDeactivation(MessageEndpointFactory messageEndpointFactory, ActivationSpec activationSpec) {
-        final NATSActivationSpec telnetActivationSpec = (NATSActivationSpec) activationSpec;
+        final NATSActivationSpec natsActivationSpec = (NATSActivationSpec) activationSpec;
 
-        final EndpointTarget endpointTarget = targets.get(telnetActivationSpec);
+        final EndpointTarget endpointTarget = targets.get(natsActivationSpec);
         if (endpointTarget == null) {
             throw new IllegalStateException("No EndpointTarget to undeploy for ActivationSpec " + activationSpec);
         }
 
-        // unsubscribe
-
+        endpointTarget.close();
         endpointTarget.messageEndpoint.release();
     }
 
@@ -99,17 +131,54 @@ public class NATSResourceAdapter implements ResourceAdapter {
         return new XAResource[0];
     }
 
-    public void sendMessage(final String channel, final String message) {
+    public void publish(final String subject, final byte[] data) throws NATSException {
         // publish a message
+        try {
+            connection.publish(subject, data);
+        } catch (Exception e) {
+            throw new NATSException(e);
+        }
     }
 
 
-    private static class EndpointTarget {
+    private static class EndpointTarget implements MessageHandler {
         private final MessageEndpoint messageEndpoint;
+        private Subscription subscription;
 
-        public EndpointTarget(MessageEndpoint messageEndpoint) {
+        public EndpointTarget(final MessageEndpoint messageEndpoint) {
             this.messageEndpoint = messageEndpoint;
         }
 
+        @Override
+        public void onMessage(final Message msg) {
+            try {
+                try {
+                    messageEndpoint.beforeDelivery(ONMESSAGE);
+                    ((InboundListener) messageEndpoint).onMessage(msg);
+                } finally {
+                    messageEndpoint.afterDelivery();
+                }
+            } catch (Throwable t) {
+                // TODO: log this
+            }
+        }
+
+        public void setSubscription(final Subscription subscription) {
+            this.subscription = subscription;
+        }
+
+        public Subscription getSubscription() {
+            return subscription;
+        }
+
+        public void close() {
+            try {
+                if (subscription != null) {
+                    subscription.close(true);
+                }
+            } catch (IOException e) {
+                // TODO: log this
+            }
+        }
     }
 }
diff --git a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSConnectionImpl.java b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSConnectionImpl.java
index 8b375f7..d743b9f 100755
--- a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSConnectionImpl.java
+++ b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSConnectionImpl.java
@@ -19,6 +19,7 @@
 package org.apache.tomee.chatterbox.nats.adapter.out;
 
 import org.apache.tomee.chatterbox.nats.api.NATSConnection;
+import org.apache.tomee.chatterbox.nats.api.NATSException;
 
 import java.util.logging.Logger;
 
@@ -34,8 +35,8 @@ public class NATSConnectionImpl implements NATSConnection {
         this.mcf = mcf;
     }
 
-    public void sendMessage(final String channel, final String message) {
-        mc.sendMessage(channel, message);
+    public void publish(final String subject, final byte[] data) throws NATSException {
+        mc.publish(subject, data);
     }
 
     public void close() {
diff --git a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSManagedConnection.java b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSManagedConnection.java
index 0999036..d7043b2 100755
--- a/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSManagedConnection.java
+++ b/chatterbox-nats/chatterbox-nats-impl/src/main/java/org/apache/tomee/chatterbox/nats/adapter/out/NATSManagedConnection.java
@@ -20,6 +20,7 @@ package org.apache.tomee.chatterbox.nats.adapter.out;
 
 import org.apache.tomee.chatterbox.nats.adapter.NATSResourceAdapter;
 import org.apache.tomee.chatterbox.nats.api.NATSConnection;
+import org.apache.tomee.chatterbox.nats.api.NATSException;
 
 import javax.resource.NotSupportedException;
 import javax.resource.ResourceException;
@@ -131,10 +132,10 @@ public class NATSManagedConnection implements ManagedConnection {
         return new NATSManagedConnectionMetaData();
     }
 
-    void sendMessage(final String channel, final String message) {
+    void publish(final String subject, final byte[] data) throws NATSException {
         log.finest("sendMessage()");
 
         final NATSResourceAdapter resourceAdapter = (NATSResourceAdapter) mcf.getResourceAdapter();
-        resourceAdapter.sendMessage(channel, message);
+        resourceAdapter.publish(subject, data);
     }
 }
diff --git a/chatterbox-nats/chatterbox-nats-rar/src/main/rar/META-INF/ra.xml b/chatterbox-nats/chatterbox-nats-rar/src/main/rar/META-INF/ra.xml
index c44905f..3d4ed8e 100644
--- a/chatterbox-nats/chatterbox-nats-rar/src/main/rar/META-INF/ra.xml
+++ b/chatterbox-nats/chatterbox-nats-rar/src/main/rar/META-INF/ra.xml
@@ -22,9 +22,9 @@
            xmlns="http://xmlns.jcp.org/xml/ns/javaee"
            xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/connector_1_7.xsd"
            version="1.7">
-  <description>Chatterbox Slack Connector</description>
-  <display-name>Chatterbox Slack Connector</display-name>
-  <eis-type>Slack Connector</eis-type>
+  <description>Chatterbox NATS Connector</description>
+  <display-name>Chatterbox NATS Connector</display-name>
+  <eis-type>NATS Connector</eis-type>
   <resourceadapter-version>1.0</resourceadapter-version>
   <license>
     <license-required>false</license-required>
@@ -32,18 +32,18 @@
   <resourceadapter>
     <resourceadapter-class>org.apache.tomee.chatterbox.nats.adapter.NATSResourceAdapter</resourceadapter-class>
     <config-property>
-      <config-property-name>token</config-property-name>
+      <config-property-name>baseAddress</config-property-name>
       <config-property-type>String</config-property-type>
     </config-property>
     <outbound-resourceadapter>
       <connection-definition>
         <managedconnectionfactory-class>org.apache.tomee.chatterbox.nats.adapter.out.NATSManagedConnectionFactory
         </managedconnectionfactory-class>
-        <connectionfactory-interface>SlackConnectionFactory
+        <connectionfactory-interface>org.apache.tomee.chatterbox.nats.api.NATSConnectionFactory
         </connectionfactory-interface>
         <connectionfactory-impl-class>org.apache.tomee.chatterbox.nats.adapter.out.NATSConnectionFactoryImpl
         </connectionfactory-impl-class>
-        <connection-interface>SlackConnection</connection-interface>
+        <connection-interface>org.apache.tomee.chatterbox.nats.api.NATSConnection</connection-interface>
         <connection-impl-class>org.apache.tomee.chatterbox.nats.adapter.out.NATSConnectionImpl</connection-impl-class>
       </connection-definition>
       <transaction-support>NoTransaction</transaction-support>
@@ -52,7 +52,7 @@
     <inbound-resourceadapter>
       <messageadapter>
         <messagelistener>
-          <messagelistener-type>InboundListener</messagelistener-type>
+          <messagelistener-type>org.apache.tomee.chatterbox.nats.api.InboundListener</messagelistener-type>
           <activationspec>
             <activationspec-class>org.apache.tomee.chatterbox.nats.adapter.NATSActivationSpec</activationspec-class>
           </activationspec>
diff --git a/chatterbox-nats/chatterbox-nats-sample-war/pom.xml b/chatterbox-nats/chatterbox-nats-sample-war/pom.xml
index aae7a99..25827a3 100644
--- a/chatterbox-nats/chatterbox-nats-sample-war/pom.xml
+++ b/chatterbox-nats/chatterbox-nats-sample-war/pom.xml
@@ -40,6 +40,12 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
+      <groupId>io.nats</groupId>
+      <artifactId>java-nats-streaming</artifactId>
+      <version>2.2.3</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>chatterbox-nats-impl</artifactId>
       <version>${project.version}</version>
@@ -89,8 +95,7 @@
           <tomeeClassifier>plus</tomeeClassifier>
           <libs>
             <lib>org.apache.tomee.chatterbox:chatterbox-nats-api:${project.version}:jar</lib>
-            <lib>org.tomitribe:tomitribe-util:1.1.0:jar</lib>
-            <lib>org.tomitribe:tomitribe-crest-api:0.3:jar</lib>
+            <lib>io.nats:java-nats-streaming:2.2.3:jar</lib>
           </libs>
           <apps>
             <app>org.apache.tomee.chatterbox:chatterbox-nats-rar:${project.version}:rar</app>
diff --git a/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/SystemBean.java b/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/EchoBean.java
similarity index 60%
rename from chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/SystemBean.java
rename to chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/EchoBean.java
index 623b3f4..2c0e81a 100644
--- a/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/SystemBean.java
+++ b/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/EchoBean.java
@@ -18,16 +18,28 @@ package org.superbiz;
 
 import org.apache.tomee.chatterbox.nats.api.InboundListener;
 import org.apache.tomee.chatterbox.nats.api.NATSException;
-import org.apache.tomee.chatterbox.nats.api.NATSMessage;
+import io.nats.streaming.Message;
 
+import javax.ejb.ActivationConfigProperty;
 import javax.ejb.MessageDriven;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 
-@MessageDriven(name = "Receiver")
-public class SystemBean implements InboundListener {
+@MessageDriven(name = "Echo", activationConfig = {
+        @ActivationConfigProperty(propertyName = "subject", propertyValue = "echo")
+})
+public class EchoBean implements InboundListener {
 
 
     @Override
-    public void onMessage(NATSMessage message) throws NATSException {
-        // TODO: fill in implementation here
+    public void onMessage(final Message message) throws NATSException {
+        try {
+            final String text = new String(message.getData(), StandardCharsets.UTF_8);
+            System.out.println(text);
+
+            message.ack();
+        } catch (IOException e) {
+            throw new NATSException(e);
+        }
     }
 }
diff --git a/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/Sender.java b/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/Sender.java
index 7ea3778..fa50cbc 100644
--- a/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/Sender.java
+++ b/chatterbox-nats/chatterbox-nats-sample-war/src/main/java/org/superbiz/Sender.java
@@ -29,6 +29,7 @@ import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.core.MediaType;
+import java.nio.charset.StandardCharsets;
 
 @Singleton
 @Lock(LockType.READ)
@@ -38,16 +39,16 @@ public class Sender {
     @Resource
     private NATSConnectionFactory cf;
 
-    @Path("{channel}")
+    @Path("{subject}")
     @POST
     @Consumes(MediaType.TEXT_PLAIN)
-    public void sendMessage(@PathParam("channel") final String channel, final String message) {
+    public void sendMessage(@PathParam("subject") final String subject, final String message) {
         try {
             final NATSConnection connection = cf.getConnection();
-            connection.sendMessage(channel, message);
+            connection.publish(subject, message.getBytes(StandardCharsets.UTF_8));
             connection.close();
-        } catch (ResourceException e) {
-            // ignore
+        } catch (Throwable t) {
+            // TODO: log this
         }
     }