You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Shing Hing Man <ma...@yahoo.com> on 2013/09/01 10:46:43 UTC

Using Camel Redis component to subcribe to a channel

Hi,

I am trying to use Camel (2.11.1) Redis component to subscribe to a channel without success.  

I have looked at the test case 

 org.apache.camel.component.redis.RedisConsumerIntegrationTest 

By the way, the above test case is disabled  by the Junit @Ignore. 

My code below is a clone of RedisConsumerIntegrationTest . It receives no message when I publish to mychannel from a Redis command prompt.
(There is no error.)

public class RedisSubscriberRouter {

    public static void main(String[] args) throws Exception {

        JedisShardInfo hostInfo = new JedisShardInfo("localhost", "6379");
        hostInfo.setPassword("xxx");
        
        final JedisConnectionFactory CONNECTION_FACTORY = new JedisConnectionFactory(
                hostInfo);
        final RedisMessageListenerContainer LISTENER_CONTAINER = new RedisMessageListenerContainer();
        
        CONNECTION_FACTORY.afterPropertiesSet();
        LISTENER_CONTAINER.setConnectionFactory(CONNECTION_FACTORY);
        LISTENER_CONTAINER.afterPropertiesSet();

        RedisTemplate<String, String> redisTemplate = new RedisTemplate<String, String>();
        redisTemplate.setConnectionFactory(CONNECTION_FACTORY);
        redisTemplate.afterPropertiesSet();


        SimpleRegistry registry = new SimpleRegistry();
        registry.put("redisTemplate", redisTemplate);
        registry.put("listenerContainer", LISTENER_CONTAINER);
        // create CamelContext
        CamelContext context = new DefaultCamelContext(registry);

        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {

                from(
                        "spring-redis://localhost:6379?command=SUBSCRIBE&channels=mychannel&listenerContainer=#listenerContainer")
                        .process(new Processor() {
                            @Override
                            public void process(Exchange exchange) throws Exception {
                                String res = exchange.getIn().getBody().toString();
                                System.out.println("************ " + res); 
                                exchange.getOut().setBody(res);
                            }
                        })
                    .to("log:foo");
            }

        });
        context.start();

        System.out.println("Press any key to shutdown.");
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
        br.readLine();

        context.stop();
    }

}


What is wrong my code above  ? 
Thanks in advance for any assistance !

Shing

Re: Using Camel Redis component to subcribe to a channel

Posted by Shing Hing Man <ma...@yahoo.com>.

Hi Bilgin,
    I publish to the channnel from the redi-cli command line client :

 redis 127.0.0.1:6379> publish mychannel Hello
(integer) 1
redis 127.0.0.1:6379> 

I have not read the mentioned post in Stackoverflow. 
After following the suggestion in the post  and adding a serializer,  my Camel Redis endpoint can receive messages published to a channel.
Below is a bit more details of the changes. 


Following the suggestion in the Stackoverflow post, I added 
<dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-redis</artifactId>
            <!-- IMPORTANT - as of 10-May-2013 the Redis Camel component only works 
                with version 1.0.0.RELASE -->
            <version>1.0.0.RELEASE</version>
        </dependency>

to my pom.xml,

In addition to above, 
1) I add a serializer  to the registry. 
registry.put("serializer", new StringRedisSerializer());

2) Use the endpoint : "spring-redis://localhost:6379?command=SUBSCRIBE&channels=mychannel&listenerContainer=#listenerContainer&serializer=#serializer"

Thanks for your help!

Shing







________________________________
 From: Bilgin Ibryam <bi...@gmail.com>
To: users@camel.apache.org; Shing Hing Man <ma...@yahoo.com> 
Sent: Sunday, September 1, 2013 2:07 PM
Subject: Re: Using Camel Redis component to subcribe to a channel
 

Hi Shing,

How are you publishing messages to Redis? Is it with Camel producer or
with other custom code?

Also have you seen this http://stackoverflow.com/a/16498040

HTH
Bilgin


On 1 September 2013 09:46, Shing Hing Man <ma...@yahoo.com> wrote:
> Hi,
>
> I am trying to use Camel (2.11.1) Redis component to subscribe to a channel without success.
>
> I have looked at the test case
>
>  org.apache.camel.component.redis.RedisConsumerIntegrationTest
>
> By the way, the above test case is disabled  by the Junit @Ignore.
>
> My code below is a clone of RedisConsumerIntegrationTest . It receives no message when I publish to mychannel from a Redis command prompt.
> (There is no error.)
>
> public class RedisSubscriberRouter {
>
>     public static void main(String[] args) throws Exception {
>
>         JedisShardInfo hostInfo = new JedisShardInfo("localhost", "6379");
>         hostInfo.setPassword("xxx");
>
>         final JedisConnectionFactory CONNECTION_FACTORY = new JedisConnectionFactory(
>                 hostInfo);
>         final RedisMessageListenerContainer LISTENER_CONTAINER = new RedisMessageListenerContainer();
>
>         CONNECTION_FACTORY.afterPropertiesSet();
>         LISTENER_CONTAINER.setConnectionFactory(CONNECTION_FACTORY);
>         LISTENER_CONTAINER.afterPropertiesSet();
>
>         RedisTemplate<String, String> redisTemplate = new RedisTemplate<String, String>();
>         redisTemplate.setConnectionFactory(CONNECTION_FACTORY);
>         redisTemplate.afterPropertiesSet();
>
>
>         SimpleRegistry registry = new SimpleRegistry();
>         registry.put("redisTemplate", redisTemplate);
>         registry.put("listenerContainer", LISTENER_CONTAINER);
>         // create CamelContext
>         CamelContext context = new DefaultCamelContext(registry);
>
>         context.addRoutes(new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>
>                 from(
>                         "spring-redis://localhost:6379?command=SUBSCRIBE&channels=mychannel&listenerContainer=#listenerContainer")
>                         .process(new Processor() {
>                             @Override
>                             public void process(Exchange exchange) throws Exception {
>                                 String res = exchange.getIn().getBody().toString();
>                                 System.out.println("************ " + res);
>                                 exchange.getOut().setBody(res);
>                             }
>                         })
>                     .to("log:foo");
>             }
>
>         });
>         context.start();
>
>         System.out.println("Press any key to shutdown.");
>         BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
>         br.readLine();
>
>         context.stop();
>     }
>
> }
>
>
> What is wrong my code above  ?
> Thanks in advance for any assistance !
>
> Shing

Re: Using Camel Redis component to subcribe to a channel

Posted by Bilgin Ibryam <bi...@gmail.com>.
Hi Shing,

How are you publishing messages to Redis? Is it with Camel producer or
with other custom code?

Also have you seen this http://stackoverflow.com/a/16498040

HTH
Bilgin


On 1 September 2013 09:46, Shing Hing Man <ma...@yahoo.com> wrote:
> Hi,
>
> I am trying to use Camel (2.11.1) Redis component to subscribe to a channel without success.
>
> I have looked at the test case
>
>  org.apache.camel.component.redis.RedisConsumerIntegrationTest
>
> By the way, the above test case is disabled  by the Junit @Ignore.
>
> My code below is a clone of RedisConsumerIntegrationTest . It receives no message when I publish to mychannel from a Redis command prompt.
> (There is no error.)
>
> public class RedisSubscriberRouter {
>
>     public static void main(String[] args) throws Exception {
>
>         JedisShardInfo hostInfo = new JedisShardInfo("localhost", "6379");
>         hostInfo.setPassword("xxx");
>
>         final JedisConnectionFactory CONNECTION_FACTORY = new JedisConnectionFactory(
>                 hostInfo);
>         final RedisMessageListenerContainer LISTENER_CONTAINER = new RedisMessageListenerContainer();
>
>         CONNECTION_FACTORY.afterPropertiesSet();
>         LISTENER_CONTAINER.setConnectionFactory(CONNECTION_FACTORY);
>         LISTENER_CONTAINER.afterPropertiesSet();
>
>         RedisTemplate<String, String> redisTemplate = new RedisTemplate<String, String>();
>         redisTemplate.setConnectionFactory(CONNECTION_FACTORY);
>         redisTemplate.afterPropertiesSet();
>
>
>         SimpleRegistry registry = new SimpleRegistry();
>         registry.put("redisTemplate", redisTemplate);
>         registry.put("listenerContainer", LISTENER_CONTAINER);
>         // create CamelContext
>         CamelContext context = new DefaultCamelContext(registry);
>
>         context.addRoutes(new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>
>                 from(
>                         "spring-redis://localhost:6379?command=SUBSCRIBE&channels=mychannel&listenerContainer=#listenerContainer")
>                         .process(new Processor() {
>                             @Override
>                             public void process(Exchange exchange) throws Exception {
>                                 String res = exchange.getIn().getBody().toString();
>                                 System.out.println("************ " + res);
>                                 exchange.getOut().setBody(res);
>                             }
>                         })
>                     .to("log:foo");
>             }
>
>         });
>         context.start();
>
>         System.out.println("Press any key to shutdown.");
>         BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
>         br.readLine();
>
>         context.stop();
>     }
>
> }
>
>
> What is wrong my code above  ?
> Thanks in advance for any assistance !
>
> Shing