You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by sonikrish <so...@gmail.com> on 2018/01/03 06:30:35 UTC

Mqtt pending running 2 durable subscriber for same topic but messages are not delievered to one of the subscriber after broker restart

package activeMQ;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class Subscriber implements MqttCallback {
	
	 public void messageArrived(String topic, MqttMessage message) throws
Exception
	 	{
         System.out.println("\nReceived Message is : " + new
String(message.getPayload()));    
	 	}
	 
	 public void connectionLost(Throwable arg0) {
			
			System.out.println("\nConnection lost");
		}
		
		public void deliveryComplete(IMqttDeliveryToken arg0) {
			
			System.out.println("\nDelivery complete");
		}

	 public static void main(String arg[]) {
			
			Subscriber s= new Subscriber();
			s.subs();
		}
		
	 public void subs(){
		 String topic        = "MQTT_publisher";          
	        int qos             = 2;
	        String broker       = <url>;
	        String clientId     = <ClientID>;
	        String username	  = <username>;
	        String password	  = <password>;
	       // IMqttMessageListener messageListner = null;
	        MemoryPersistence persistence = new MemoryPersistence();
	        
		 try {
	            MqttClient sampleClient = new MqttClient(broker, clientId,
persistence);
	            MqttConnectOptions connOpts = new MqttConnectOptions();
	            connOpts.setCleanSession(false);
	            connOpts.setUserName(username);
	            connOpts.setPassword(password.toCharArray());
	            System.out.println("Connecting to broker: "+broker);
	            sampleClient.connect(connOpts);   
	            if(sampleClient.isConnected()==true)
	            {
	            System.out.println("Connected");
	            sampleClient.subscribe(topic, qos);
	            
	            System.out.println("Message subscribed");
	            
	            sampleClient.setCallback(this);
	            sampleClient.connect();
	          }
	            
	            sampleClient.disconnect();
	            System.out.println("Disconnected");
	        }
	        catch(MqttException me)
	        {
	            System.out.println("reason "+me.getReasonCode());
	        }
	 }
}

Both subscriber is having the same properties and ClientID is different for
both subscriber. Also, only one of them is getting message after broker
restart everytime and another one is not getting any messages despite of
being durable subscriber. 

When i am logging into activeMQ console and watching under subscriber for
one which is not getting msg, its Dispatched Queue Size is increasing along
with Enqueue Counter but Dequeue Counter  is not increasing.



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-Dev-f2368404.html

Re: Mqtt pending running 2 durable subscriber for same topic but messages are not delievered to one of the subscriber after broker restart

Posted by Arthur Naseef <ar...@amlinv.com>.
In the console, can you see the connection for the inactive subscriber?

On Wed, Jan 3, 2018 at 12:30 AM, sonikrish <so...@gmail.com> wrote:

> package activeMQ;
>
> import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
> import org.eclipse.paho.client.mqttv3.MqttCallback;
> import org.eclipse.paho.client.mqttv3.MqttClient;
> import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
> import org.eclipse.paho.client.mqttv3.MqttException;
> import org.eclipse.paho.client.mqttv3.MqttMessage;
> import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
>
> public class Subscriber implements MqttCallback {
>
>          public void messageArrived(String topic, MqttMessage message)
> throws
> Exception
>                 {
>          System.out.println("\nReceived Message is : " + new
> String(message.getPayload()));
>                 }
>
>          public void connectionLost(Throwable arg0) {
>
>                         System.out.println("\nConnection lost");
>                 }
>
>                 public void deliveryComplete(IMqttDeliveryToken arg0) {
>
>                         System.out.println("\nDelivery complete");
>                 }
>
>          public static void main(String arg[]) {
>
>                         Subscriber s= new Subscriber();
>                         s.subs();
>                 }
>
>          public void subs(){
>                  String topic        = "MQTT_publisher";
>                 int qos             = 2;
>                 String broker       = <url>;
>                 String clientId     = <ClientID>;
>                 String username   = <username>;
>                 String password   = <password>;
>                // IMqttMessageListener messageListner = null;
>                 MemoryPersistence persistence = new MemoryPersistence();
>
>                  try {
>                     MqttClient sampleClient = new MqttClient(broker,
> clientId,
> persistence);
>                     MqttConnectOptions connOpts = new MqttConnectOptions();
>                     connOpts.setCleanSession(false);
>                     connOpts.setUserName(username);
>                     connOpts.setPassword(password.toCharArray());
>                     System.out.println("Connecting to broker: "+broker);
>                     sampleClient.connect(connOpts);
>                     if(sampleClient.isConnected()==true)
>                     {
>                     System.out.println("Connected");
>                     sampleClient.subscribe(topic, qos);
>
>                     System.out.println("Message subscribed");
>
>                     sampleClient.setCallback(this);
>                     sampleClient.connect();
>                   }
>
>                     sampleClient.disconnect();
>                     System.out.println("Disconnected");
>                 }
>                 catch(MqttException me)
>                 {
>                     System.out.println("reason "+me.getReasonCode());
>                 }
>          }
> }
>
> Both subscriber is having the same properties and ClientID is different for
> both subscriber. Also, only one of them is getting message after broker
> restart everytime and another one is not getting any messages despite of
> being durable subscriber.
>
> When i am logging into activeMQ console and watching under subscriber for
> one which is not getting msg, its Dispatched Queue Size is increasing along
> with Enqueue Counter but Dequeue Counter  is not increasing.
>
>
>
> --
> Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-Dev-
> f2368404.html
>