You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by "dileepkumar (JIRA)" <ji...@apache.org> on 2013/03/03 03:49:12 UTC

[jira] [Comment Edited] (APLO-302) QOS implementation in apache apollo

    [ https://issues.apache.org/jira/browse/APLO-302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13591604#comment-13591604 ] 

dileepkumar edited comment on APLO-302 at 3/3/13 2:47 AM:
----------------------------------------------------------

I have created the instance of the broker and started the broker...  Below are the pub and sub that I am using .. Please correct me If there is any thing wrong

Subscriber :

import java.io.IOException;
import java.sql.Timestamp;

import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;


public class MQTTListener implements MqttCallback {
    private MqttClient client;
    private MqttConnectOptions mConOpt;
    private String url = "";
   

public MQTTListener(String url, String clientId, String topic, int qos) {
   
    this.url=url;
    try {
        mConOpt = new MqttConnectOptions();
        mConOpt.setCleanSession(false);
        client = new MqttClient(url, clientId);
        client.setCallback(this);
        subscribe(topic, qos);
    } catch (MqttException e) {
        e.printStackTrace();
        // System.exit(1);
    }
   
}
    public void subscribe(String topicName, int qos) throws MqttException {
        // Connect to the server
        client.connect( mConOpt);
        System.out.println("Connected to "+url+" with client ID "+client.getClientId());
        System.out.println("Subscribing to topic \""+topicName+"\" qos "+qos);
        client.subscribe(topicName, qos);
        try {
            System.in.read();
        }catch (IOException e) {
            //If we can't read we'll just exit
        }
        // Disconnect the client
        client.disconnect();
        System.out.println("Disconnected");
    }

    @Override
    public void connectionLost(Throwable arg0) {
        // TODO Auto-generated method stub
        System.out.println("Connection to " + url + " lost!" + arg0);
    }

    @Override
    public void deliveryComplete(MqttDeliveryToken arg0) {
        // TODO Auto-generated method stub
        
    }

    @Override
    public void messageArrived(MqttTopic topic, MqttMessage message)
            throws MqttException {
        // TODO Auto-generated method stub
        String time = new Timestamp(System.currentTimeMillis()).toString();

        System.out.println("Time:\t" +time +
                           "  Topic:\t" + topic.getName() +
                           "  Message:\t" + new String(message.getPayload()) +
                           "  QoS:\t" + message.getQos());
    }
   
    public static void main(String[] args) {
        String broker="localhost";
        String port="1883";
        String clientId="Listener1";
        String topic="AAA";
        String url="";
        int qos=2;
        url="tcp://"+broker+":"+port;
        try{
    new MQTTListener(url, clientId, topic, qos);
        }catch(Exception e){
            e.printStackTrace();
        }
    }

}





Publisher :

import java.sql.Timestamp;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;


public class MQTTSender {
    private String url = "";
    private MqttClient client;
    private MqttConnectOptions mConOpt;
    public MQTTSender(String url,String clientId,int qos,String topic,String message) throws MqttException {
        // TODO Auto-generated constructor stub
        mConOpt = new MqttConnectOptions();
        mConOpt.setCleanSession(false);
        client = new MqttClient(url, clientId, null);       
        publish(topic, qos, message.getBytes());
    }
     public void publish(String topicName, int qos, byte[] payload) throws MqttException {
           
            client.connect( mConOpt);
            System.out.println("Connected to "+url + " with client ID "+client.getClientId());   
            MqttTopic topic = client.getTopic(topicName);
            MqttMessage message = new MqttMessage(payload);
            message.setQos(qos);
               String time = new Timestamp(System.currentTimeMillis()).toString();
            System.out.println("Publishing at: "+time+ " to topic \""+topicName+"\" qos "+qos);
            MqttDeliveryToken token = topic.publish(message);
            token.waitForCompletion();
            client.disconnect();
            System.out.println("Disconnected");

         }
    public static void main(String[] args) {
        String broker="localhost";
        String port="1883";
        String url="";
        String topic="AAA";
        String clientId="Sender1";
        String message="Hello World";
        int qos=2;
        url="tcp://"+broker+":"+port;
        try {
            new MQTTSender(url, clientId, qos, topic, message);
        } catch (MqttException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

                
      was (Author: dileeprdk):
    Subscriber :

import java.io.IOException;
import java.sql.Timestamp;

import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;


public class MQTTListener implements MqttCallback {
    private MqttClient client;
    private MqttConnectOptions mConOpt;
    private String url = "";
   

public MQTTListener(String url, String clientId, String topic, int qos) {
   
    this.url=url;
    try {
        mConOpt = new MqttConnectOptions();
        mConOpt.setCleanSession(false);
        client = new MqttClient(url, clientId);
        client.setCallback(this);
        subscribe(topic, qos);
    } catch (MqttException e) {
        e.printStackTrace();
        // System.exit(1);
    }
   
}
    public void subscribe(String topicName, int qos) throws MqttException {
        // Connect to the server
        client.connect();
        System.out.println("Connected to "+url+" with client ID "+client.getClientId());
        System.out.println("Subscribing to topic \""+topicName+"\" qos "+qos);
        client.subscribe(topicName, qos);
        try {
            System.in.read();
        }catch (IOException e) {
            //If we can't read we'll just exit
        }
        // Disconnect the client
        client.disconnect();
        System.out.println("Disconnected");
    }

    @Override
    public void connectionLost(Throwable arg0) {
        // TODO Auto-generated method stub
        System.out.println("Connection to " + url + " lost!" + arg0);
    }

    @Override
    public void deliveryComplete(MqttDeliveryToken arg0) {
        // TODO Auto-generated method stub
        
    }

    @Override
    public void messageArrived(MqttTopic topic, MqttMessage message)
            throws MqttException {
        // TODO Auto-generated method stub
        String time = new Timestamp(System.currentTimeMillis()).toString();

        System.out.println("Time:\t" +time +
                           "  Topic:\t" + topic.getName() +
                           "  Message:\t" + new String(message.getPayload()) +
                           "  QoS:\t" + message.getQos());
    }
   
    public static void main(String[] args) {
        String broker="localhost";
        String port="1883";
        String clientId="Listener1";
        String topic="AAA";
        String url="";
        int qos=2;
        url="tcp://"+broker+":"+port;
        try{
    new MQTTListener(url, clientId, topic, qos);
        }catch(Exception e){
            e.printStackTrace();
        }
    }

}





Publisher :

import java.sql.Timestamp;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;


public class MQTTSender {
    private String url = "";
    private MqttClient client;
    private MqttConnectOptions mConOpt;
    public MQTTSender(String url,String clientId,int qos,String topic,String message) throws MqttException {
        // TODO Auto-generated constructor stub
        mConOpt = new MqttConnectOptions();
        mConOpt.setCleanSession(false);
        client = new MqttClient(url, clientId, null);       
        publish(topic, qos, message.getBytes());
    }
     public void publish(String topicName, int qos, byte[] payload) throws MqttException {
           
            client.connect();
            System.out.println("Connected to "+url + " with client ID "+client.getClientId());   
            MqttTopic topic = client.getTopic(topicName);
            MqttMessage message = new MqttMessage(payload);
            message.setQos(qos);
               String time = new Timestamp(System.currentTimeMillis()).toString();
            System.out.println("Publishing at: "+time+ " to topic \""+topicName+"\" qos "+qos);
            MqttDeliveryToken token = topic.publish(message);
            token.waitForCompletion();
            client.disconnect();
            System.out.println("Disconnected");

         }
    public static void main(String[] args) {
        String broker="localhost";
        String port="1883";
        String url="";
        String topic="AAA";
        String clientId="Sender1";
        String message="Hello World";
        int qos=2;
        url="tcp://"+broker+":"+port;
        try {
            new MQTTSender(url, clientId, qos, topic, message);
        } catch (MqttException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

                  
> QOS implementation in apache apollo
> -----------------------------------
>
>                 Key: APLO-302
>                 URL: https://issues.apache.org/jira/browse/APLO-302
>             Project: ActiveMQ Apollo
>          Issue Type: Bug
>          Components: apollo-broker
>    Affects Versions: 1.5, 1.6
>         Environment: Windows 7
>            Reporter: dileepkumar
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> My aim is to retrieve the messages that are published when the subscriber is not connected to the server.  I have tried with Mosquitto broker and paho client , I have received all the missed out messages there was no problem. But when I used apache apollo in place of mosquitto broker then I am not able to receive the missed out messages. But when I restart the broker(apache apollo) those message are been published.  Is it the fault of the broker or any configuration changes that I need to make specific for apache apollo.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira