You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Ed Kaltenbach (JIRA)" <ji...@apache.org> on 2016/07/14 22:28:20 UTC

[jira] [Updated] (ARTEMIS-630) STOMP server quits sending to all subscribers when one client disconnects

     [ https://issues.apache.org/jira/browse/ARTEMIS-630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ed Kaltenbach updated ARTEMIS-630:
----------------------------------
    Attachment: StompClient.java

> STOMP server quits sending to all subscribers when one client disconnects
> -------------------------------------------------------------------------
>
>                 Key: ARTEMIS-630
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-630
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: Stomp
>    Affects Versions: 1.1.0
>         Environment: Wildfly 10.0.0, Windows 7 64 bit, Java 1.8.0_91
>            Reporter: Ed Kaltenbach
>         Attachments: StompClient.java
>
>
> Multiple clients connected to a JMS topic via STOMP.  When one client disconnects from the server then all clients quit receiving messages and cannot send messages.  As soon as a new client sends a SUBSCRIBE message then all clients begin receiving messages again.
> Here is the way STOMP and the JMS topic is defined in standalone.xml:
> <subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
>             <server name="default">
>                 <security-setting name="#">
>                     <role name="guest" delete-non-durable-queue="true" create-non-durable-queue="true" consume="true" send="true"/>
>                 </security-setting>
>                 <address-setting name="#" message-counter-history-day-limit="10" page-size-bytes="2097152" max-size-bytes="10485760" expiry-address="jms.queue.ExpiryQueue" dead-letter-address="jms.queue.DLQ"/>
>                 <http-connector name="http-connector" endpoint="http-acceptor" socket-binding="http"/>
>                 <http-connector name="http-connector-throughput" endpoint="http-acceptor-throughput" socket-binding="http">
>                     <param name="batch-delay" value="50"/>
>                 </http-connector>
>                 <in-vm-connector name="in-vm" server-id="0"/>
>                 <http-acceptor name="http-acceptor" http-listener="default"/>
>                 <http-acceptor name="http-acceptor-throughput" http-listener="default">
>                     <param name="batch-delay" value="50"/>
>                     <param name="direct-deliver" value="false"/>
>                 </http-acceptor>
>                 <in-vm-acceptor name="in-vm" server-id="0"/>
>                 <acceptor name="stomp-acceptor" factory-class="org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory">
>                     <param name="protocols" value="STOMP"/>
>                     <param name="port" value="61613"/>
>                 </acceptor>
>                 <jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue"/>
>                 <jms-queue name="DLQ" entries="java:/jms/queue/DLQ"/>
>                 <jms-topic name="ACRS_Exit" entries="java:/jms/topic/ACRS_Exit"/>
>                 <connection-factory name="InVmConnectionFactory" entries="java:/ConnectionFactory" connectors="in-vm"/>
>                 <connection-factory name="RemoteConnectionFactory" entries="java:jboss/exported/jms/RemoteConnectionFactory" connectors="http-connector"/>
>                 <pooled-connection-factory name="activemq-ra" transaction="xa" entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" connectors="in-vm"/>
>             </server>
>         </subsystem>
> Run multiple instances of the program listed below.  Stagger the starts by a minute or so.  As soon and one instance of the program completes, all other instances will start having problems sending messages to the topic.  The server will send a response that says "Destination does not exist\c jms.topic.ACRS_Exit".  If you start another instance then all other running instances will start receiving messages and will be able to send messages.
> Here is the code for the sample program:
> import java.io.BufferedReader;
> import java.io.InputStreamReader;
> import java.net.InetSocketAddress;
> import java.net.SocketAddress;
> import java.nio.ByteBuffer;
> import java.nio.channels.AsynchronousSocketChannel;
> import java.nio.channels.CompletionHandler;
> import java.nio.charset.Charset;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.Date;
> import java.util.concurrent.Future;
> public class StompClient {
>   public static void main(String[] args) throws Exception {
>     StompClient foo = new StompClient();
>   }
>   public StompClient() throws Exception {
>     AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
>     SocketAddress serverAddr = new InetSocketAddress("localhost", 61613);
>     Future<Void> result = channel.connect(serverAddr);
>     result.get();
>     System.out.println("Socket Connected");
>     // start two threads, one for reading and one for writing
>     ReaderThread rt = new ReaderThread(channel);
>     Thread readerThread = new Thread(rt);
>     readerThread.start();
>     Thread writerThread = new Thread(new WriterThread(channel));
>     writerThread.start();
>   }
>   protected class ReaderThread implements Runnable {
>     AsynchronousSocketChannel m_channel;
>     public ReaderThread(AsynchronousSocketChannel channel) {
>       m_channel = channel;
>     }
>     public void run() {
>       String outputStr;
>       Integer readByteCnt;
>       while (true) {
>         ByteBuffer buffer = ByteBuffer.allocate(2048);
>         buffer.clear();
>         Future<Integer> result = m_channel.read(buffer);
>         try
>         {
>           readByteCnt = (Integer)result.get();
>         } catch (Exception ex) {
>           readByteCnt = -1;
>         }
>         if (readByteCnt > -1)
>         {
>           // convert the bytes to a string
>           try
>           {
>             outputStr = new String(buffer.array(), "UTF-8");
>             outputStr = outputStr.trim();
>             System.out.println("-----Beginning of Message From Stomp Server:");
>             System.out.println(outputStr);
>             System.out.println("-----Ending of Message From Stomp Server");
>           } catch (Exception ex) {
>             System.out.println("ReaderThread Exception:" + ex.getMessage());
>           }
>         }
>       }
>     }
>   }
>   protected class WriterThread implements Runnable {
>     AsynchronousSocketChannel m_channel;
>     public WriterThread(AsynchronousSocketChannel channel) {
>       m_channel = channel;
>     }
>     public void run() {
>       String topicName = "jms.topic.ACRS_Exit";
>       ByteBuffer buffer = ByteBuffer.allocate(2048);
>       Charset cs = Charset.forName("UTF-8");
>       Integer writtenByteCnt;
>       Future<Integer> result;
>       byte[] data;
>       String msg;
>       try
>       {
>         msg = "CONNECT\n";
>         msg = msg + "accept-version:1.2\n";
>         msg = msg + "heart-beat:5000,5000\n";
>         msg = msg + "login:dynsub\n";
>         msg = msg + "passcode:dynsub\n";
>         msg = msg + "\n";
>         msg = msg + '\0';
>         SendMessageToStomp(m_channel, msg);
>         java.lang.Thread.sleep(5000);
>         msg = "SUBSCRIBE\n";
>         msg = msg + "destination:" + topicName + "\n";
>         msg = msg + "id:dest1\n";
>         msg = msg + "ack:auto\n";
>         msg = msg + "\n";
>         msg = msg + '\0';
>         SendMessageToStomp(m_channel, msg);
>         // send a heartbeat message every 5 seconds
>         // NOTE: this was changed to send the date and time instead of an empty message
>         DateFormat dateFormat = new SimpleDateFormat("yyyy-mm-dd hh:mm:ss");
>         for(int i = 0; i < 20; i++)
>         {
>           java.lang.Thread.sleep(5000);
>           SendMessageToStomp(m_channel, "\n");
>           // NOTE: this was changed to also send a message to the topic
>           Date d = new Date();
>           String dateStr = dateFormat.format(d);
>           msg = "SEND\n";
>           msg += "destination:" + topicName + "\n";
>           msg += "content-type:text/plain\n";
>           msg += "content-length:" + dateStr.length() + "\n";
>           msg += "\n";
>           msg += dateStr;
>           msg += '\0';
>           SendMessageToStomp(m_channel, msg);
>         }
>         java.lang.Thread.sleep(5000);
>         // now test the unsubscribe
>         msg = "UNSUBSCRIBE\n";
>         msg = msg + "id:dest1\n";
>         msg = msg + "\n";
>         msg = msg + '\0';
>         SendMessageToStomp(m_channel, msg);
>         msg = "DISCONNECT\n";
>         msg = msg + '\0';
>         SendMessageToStomp(m_channel, msg);
>         m_channel.shutdownInput();
>         m_channel.shutdownOutput();
>         m_channel.close();
>         System.exit(0);
> /*
>         // if the UNSUBSCRIBE, DISCONNECT, and socket shutdown/close
>         // are removed and the following added, the server will
>         // continue to send messages to other clients for about 20 seconds.
>         // send a heartbeat message every 5 seconds
>         while(true)
>         {
>           java.lang.Thread.sleep(5000);
>           SendMessageToStomp(m_channel, "\n");
>         }
> */
>       } catch (Exception ex) {
>         System.out.println("WriterThread Exception:" + ex.getMessage());
>       }
>     }
>   }
>   protected void SendMessageToStomp(AsynchronousSocketChannel channel, String msg) throws Exception {
>     ByteBuffer buffer = ByteBuffer.allocate(2048);
>     Charset cs = Charset.forName("UTF-8");
>     Integer writtenByteCnt;
>     Future<Integer> result;
>     byte[] data;
>     data = msg.getBytes(cs);
>     buffer.put(data);
>     buffer.flip();
>     System.out.println("-----Sending message to Stomp Server:\n" + msg);
>     result = channel.write(buffer);
>     writtenByteCnt = (Integer)result.get();
>   }
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)