You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by unknown <mt...@gmail.com> on 2020/02/07 12:43:59 UTC

activemq and the retain-flag

How can i get retained-messages using activemq and camel ?
I must set the flag setUseRetroactiveConsumer on the
activemqconnectionfactory.
This works, but there must be a better way.

best regards, mtmmtm

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.camel.*;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsConfiguration;
import org.apache.camel.component.jms.JmsEndpoint;
import org.apache.camel.impl.DefaultCamelContext;

public class SendMQTTTest {
    public void configure() throws Exception {
        String mqtt =
"activemq:topic:test?brokerURL=tcp://localhost:61610&allowAdditionalHeaders=true";
        CamelContext camelContext = new DefaultCamelContext();
        addMqtt(mqtt, camelContext);
        camelContext.start();
        enableRetain(camelContext);
        camelContext.startRoute("mqtt");
    }

    private void addMqtt(String mqtt, CamelContext camelContext) throws
Exception {
        Processor print = exchange -> {
            Message camelMessage = exchange.getIn();
            Object retain_obj = camelMessage.getHeader("ActiveMQ.Retained");
            boolean retain = retain_obj != null ? (Boolean) retain_obj :
false;
            System.out.println("retain=" + retain);
        };
        camelContext.addRoutes(new RouteBuilder() {
            public void configure() {

from(mqtt).routeId("mqtt").autoStartup(false).process(print);
            }
        });
    }

    private void enableRetain(CamelContext context) {
        for (Route r : context.getRoutes()) {
            Consumer c = r.getConsumer();
            if (c.getEndpoint() instanceof JmsEndpoint) {
                JmsEndpoint ep = (JmsEndpoint) c.getEndpoint();
                JmsConfiguration cfg = ep.getConfiguration();
                if (cfg.getConnectionFactory() instanceof
PooledConnectionFactory) {
                    PooledConnectionFactory fact =
(PooledConnectionFactory) cfg.getConnectionFactory();
                    ActiveMQConnectionFactory a =
(ActiveMQConnectionFactory) fact.getConnectionFactory();
                    a.setUseRetroactiveConsumer(true);
                }
            }
        }
    }

    public static void main(String... args) throws Exception {
        SendMQTTTest main = new SendMQTTTest();
        main.configure();
        Thread.sleep(10000000);
    }
}

Re: activemq and the retain-flag

Posted by unknown <mt...@gmail.com>.
I did not find any documentation about doing this using only parameters to
the component.
I had to write all that code in order to set the flag before starting the
component. Could you show me how to do it  with minimal java-code ?

On Fri, Feb 7, 2020 at 10:02 PM Claus Ibsen <cl...@gmail.com> wrote:

> Hi
>
> You create the ActiveMQ CF and set all its options as you like, also
> the AMQ specific option about retain, and then configure the activemq
> camel component to use your CF
>
> On Fri, Feb 7, 2020 at 1:59 PM unknown <mt...@gmail.com> wrote:
> >
> > How can i get retained-messages using activemq and camel ?
> > I must set the flag setUseRetroactiveConsumer on the
> > activemqconnectionfactory.
> > This works, but there must be a better way.
> >
> > best regards, mtmmtm
> >
> > import org.apache.activemq.ActiveMQConnectionFactory;
> > import org.apache.activemq.camel.component.ActiveMQComponent;
> > import org.apache.activemq.pool.PooledConnectionFactory;
> > import org.apache.camel.*;
> > import org.apache.camel.builder.RouteBuilder;
> > import org.apache.camel.component.jms.JmsConfiguration;
> > import org.apache.camel.component.jms.JmsEndpoint;
> > import org.apache.camel.impl.DefaultCamelContext;
> >
> > public class SendMQTTTest {
> >     public void configure() throws Exception {
> >         String mqtt =
> >
> "activemq:topic:test?brokerURL=tcp://localhost:61610&allowAdditionalHeaders=true";
> >         CamelContext camelContext = new DefaultCamelContext();
> >         addMqtt(mqtt, camelContext);
> >         camelContext.start();
> >         enableRetain(camelContext);
> >         camelContext.startRoute("mqtt");
> >     }
> >
> >     private void addMqtt(String mqtt, CamelContext camelContext) throws
> > Exception {
> >         Processor print = exchange -> {
> >             Message camelMessage = exchange.getIn();
> >             Object retain_obj =
> camelMessage.getHeader("ActiveMQ.Retained");
> >             boolean retain = retain_obj != null ? (Boolean) retain_obj :
> > false;
> >             System.out.println("retain=" + retain);
> >         };
> >         camelContext.addRoutes(new RouteBuilder() {
> >             public void configure() {
> >
> > from(mqtt).routeId("mqtt").autoStartup(false).process(print);
> >             }
> >         });
> >     }
> >
> >     private void enableRetain(CamelContext context) {
> >         for (Route r : context.getRoutes()) {
> >             Consumer c = r.getConsumer();
> >             if (c.getEndpoint() instanceof JmsEndpoint) {
> >                 JmsEndpoint ep = (JmsEndpoint) c.getEndpoint();
> >                 JmsConfiguration cfg = ep.getConfiguration();
> >                 if (cfg.getConnectionFactory() instanceof
> > PooledConnectionFactory) {
> >                     PooledConnectionFactory fact =
> > (PooledConnectionFactory) cfg.getConnectionFactory();
> >                     ActiveMQConnectionFactory a =
> > (ActiveMQConnectionFactory) fact.getConnectionFactory();
> >                     a.setUseRetroactiveConsumer(true);
> >                 }
> >             }
> >         }
> >     }
> >
> >     public static void main(String... args) throws Exception {
> >         SendMQTTTest main = new SendMQTTTest();
> >         main.configure();
> >         Thread.sleep(10000000);
> >     }
> > }
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2
>

Re: activemq and the retain-flag

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

You create the ActiveMQ CF and set all its options as you like, also
the AMQ specific option about retain, and then configure the activemq
camel component to use your CF

On Fri, Feb 7, 2020 at 1:59 PM unknown <mt...@gmail.com> wrote:
>
> How can i get retained-messages using activemq and camel ?
> I must set the flag setUseRetroactiveConsumer on the
> activemqconnectionfactory.
> This works, but there must be a better way.
>
> best regards, mtmmtm
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.camel.component.ActiveMQComponent;
> import org.apache.activemq.pool.PooledConnectionFactory;
> import org.apache.camel.*;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.jms.JmsConfiguration;
> import org.apache.camel.component.jms.JmsEndpoint;
> import org.apache.camel.impl.DefaultCamelContext;
>
> public class SendMQTTTest {
>     public void configure() throws Exception {
>         String mqtt =
> "activemq:topic:test?brokerURL=tcp://localhost:61610&allowAdditionalHeaders=true";
>         CamelContext camelContext = new DefaultCamelContext();
>         addMqtt(mqtt, camelContext);
>         camelContext.start();
>         enableRetain(camelContext);
>         camelContext.startRoute("mqtt");
>     }
>
>     private void addMqtt(String mqtt, CamelContext camelContext) throws
> Exception {
>         Processor print = exchange -> {
>             Message camelMessage = exchange.getIn();
>             Object retain_obj = camelMessage.getHeader("ActiveMQ.Retained");
>             boolean retain = retain_obj != null ? (Boolean) retain_obj :
> false;
>             System.out.println("retain=" + retain);
>         };
>         camelContext.addRoutes(new RouteBuilder() {
>             public void configure() {
>
> from(mqtt).routeId("mqtt").autoStartup(false).process(print);
>             }
>         });
>     }
>
>     private void enableRetain(CamelContext context) {
>         for (Route r : context.getRoutes()) {
>             Consumer c = r.getConsumer();
>             if (c.getEndpoint() instanceof JmsEndpoint) {
>                 JmsEndpoint ep = (JmsEndpoint) c.getEndpoint();
>                 JmsConfiguration cfg = ep.getConfiguration();
>                 if (cfg.getConnectionFactory() instanceof
> PooledConnectionFactory) {
>                     PooledConnectionFactory fact =
> (PooledConnectionFactory) cfg.getConnectionFactory();
>                     ActiveMQConnectionFactory a =
> (ActiveMQConnectionFactory) fact.getConnectionFactory();
>                     a.setUseRetroactiveConsumer(true);
>                 }
>             }
>         }
>     }
>
>     public static void main(String... args) throws Exception {
>         SendMQTTTest main = new SendMQTTTest();
>         main.configure();
>         Thread.sleep(10000000);
>     }
> }



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2