You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Shing Hing Man <ma...@yahoo.com> on 2013/09/01 10:46:43 UTC
Using Camel Redis component to subcribe to a channel
Hi,
I am trying to use Camel (2.11.1) Redis component to subscribe to a channel without success.
I have looked at the test case
org.apache.camel.component.redis.RedisConsumerIntegrationTest
By the way, the above test case is disabled by the Junit @Ignore.
My code below is a clone of RedisConsumerIntegrationTest . It receives no message when I publish to mychannel from a Redis command prompt.
(There is no error.)
public class RedisSubscriberRouter {
public static void main(String[] args) throws Exception {
JedisShardInfo hostInfo = new JedisShardInfo("localhost", "6379");
hostInfo.setPassword("xxx");
final JedisConnectionFactory CONNECTION_FACTORY = new JedisConnectionFactory(
hostInfo);
final RedisMessageListenerContainer LISTENER_CONTAINER = new RedisMessageListenerContainer();
CONNECTION_FACTORY.afterPropertiesSet();
LISTENER_CONTAINER.setConnectionFactory(CONNECTION_FACTORY);
LISTENER_CONTAINER.afterPropertiesSet();
RedisTemplate<String, String> redisTemplate = new RedisTemplate<String, String>();
redisTemplate.setConnectionFactory(CONNECTION_FACTORY);
redisTemplate.afterPropertiesSet();
SimpleRegistry registry = new SimpleRegistry();
registry.put("redisTemplate", redisTemplate);
registry.put("listenerContainer", LISTENER_CONTAINER);
// create CamelContext
CamelContext context = new DefaultCamelContext(registry);
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from(
"spring-redis://localhost:6379?command=SUBSCRIBE&channels=mychannel&listenerContainer=#listenerContainer")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
String res = exchange.getIn().getBody().toString();
System.out.println("************ " + res);
exchange.getOut().setBody(res);
}
})
.to("log:foo");
}
});
context.start();
System.out.println("Press any key to shutdown.");
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
br.readLine();
context.stop();
}
}
What is wrong my code above ?
Thanks in advance for any assistance !
Shing
Re: Using Camel Redis component to subcribe to a channel
Posted by Shing Hing Man <ma...@yahoo.com>.
Hi Bilgin,
I publish to the channnel from the redi-cli command line client :
redis 127.0.0.1:6379> publish mychannel Hello
(integer) 1
redis 127.0.0.1:6379>
I have not read the mentioned post in Stackoverflow.
After following the suggestion in the post and adding a serializer, my Camel Redis endpoint can receive messages published to a channel.
Below is a bit more details of the changes.
Following the suggestion in the Stackoverflow post, I added
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<!-- IMPORTANT - as of 10-May-2013 the Redis Camel component only works
with version 1.0.0.RELASE -->
<version>1.0.0.RELEASE</version>
</dependency>
to my pom.xml,
In addition to above,
1) I add a serializer to the registry.
registry.put("serializer", new StringRedisSerializer());
2) Use the endpoint : "spring-redis://localhost:6379?command=SUBSCRIBE&channels=mychannel&listenerContainer=#listenerContainer&serializer=#serializer"
Thanks for your help!
Shing
________________________________
From: Bilgin Ibryam <bi...@gmail.com>
To: users@camel.apache.org; Shing Hing Man <ma...@yahoo.com>
Sent: Sunday, September 1, 2013 2:07 PM
Subject: Re: Using Camel Redis component to subcribe to a channel
Hi Shing,
How are you publishing messages to Redis? Is it with Camel producer or
with other custom code?
Also have you seen this http://stackoverflow.com/a/16498040
HTH
Bilgin
On 1 September 2013 09:46, Shing Hing Man <ma...@yahoo.com> wrote:
> Hi,
>
> I am trying to use Camel (2.11.1) Redis component to subscribe to a channel without success.
>
> I have looked at the test case
>
> org.apache.camel.component.redis.RedisConsumerIntegrationTest
>
> By the way, the above test case is disabled by the Junit @Ignore.
>
> My code below is a clone of RedisConsumerIntegrationTest . It receives no message when I publish to mychannel from a Redis command prompt.
> (There is no error.)
>
> public class RedisSubscriberRouter {
>
> public static void main(String[] args) throws Exception {
>
> JedisShardInfo hostInfo = new JedisShardInfo("localhost", "6379");
> hostInfo.setPassword("xxx");
>
> final JedisConnectionFactory CONNECTION_FACTORY = new JedisConnectionFactory(
> hostInfo);
> final RedisMessageListenerContainer LISTENER_CONTAINER = new RedisMessageListenerContainer();
>
> CONNECTION_FACTORY.afterPropertiesSet();
> LISTENER_CONTAINER.setConnectionFactory(CONNECTION_FACTORY);
> LISTENER_CONTAINER.afterPropertiesSet();
>
> RedisTemplate<String, String> redisTemplate = new RedisTemplate<String, String>();
> redisTemplate.setConnectionFactory(CONNECTION_FACTORY);
> redisTemplate.afterPropertiesSet();
>
>
> SimpleRegistry registry = new SimpleRegistry();
> registry.put("redisTemplate", redisTemplate);
> registry.put("listenerContainer", LISTENER_CONTAINER);
> // create CamelContext
> CamelContext context = new DefaultCamelContext(registry);
>
> context.addRoutes(new RouteBuilder() {
> @Override
> public void configure() throws Exception {
>
> from(
> "spring-redis://localhost:6379?command=SUBSCRIBE&channels=mychannel&listenerContainer=#listenerContainer")
> .process(new Processor() {
> @Override
> public void process(Exchange exchange) throws Exception {
> String res = exchange.getIn().getBody().toString();
> System.out.println("************ " + res);
> exchange.getOut().setBody(res);
> }
> })
> .to("log:foo");
> }
>
> });
> context.start();
>
> System.out.println("Press any key to shutdown.");
> BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
> br.readLine();
>
> context.stop();
> }
>
> }
>
>
> What is wrong my code above ?
> Thanks in advance for any assistance !
>
> Shing
Re: Using Camel Redis component to subcribe to a channel
Posted by Bilgin Ibryam <bi...@gmail.com>.
Hi Shing,
How are you publishing messages to Redis? Is it with Camel producer or
with other custom code?
Also have you seen this http://stackoverflow.com/a/16498040
HTH
Bilgin
On 1 September 2013 09:46, Shing Hing Man <ma...@yahoo.com> wrote:
> Hi,
>
> I am trying to use Camel (2.11.1) Redis component to subscribe to a channel without success.
>
> I have looked at the test case
>
> org.apache.camel.component.redis.RedisConsumerIntegrationTest
>
> By the way, the above test case is disabled by the Junit @Ignore.
>
> My code below is a clone of RedisConsumerIntegrationTest . It receives no message when I publish to mychannel from a Redis command prompt.
> (There is no error.)
>
> public class RedisSubscriberRouter {
>
> public static void main(String[] args) throws Exception {
>
> JedisShardInfo hostInfo = new JedisShardInfo("localhost", "6379");
> hostInfo.setPassword("xxx");
>
> final JedisConnectionFactory CONNECTION_FACTORY = new JedisConnectionFactory(
> hostInfo);
> final RedisMessageListenerContainer LISTENER_CONTAINER = new RedisMessageListenerContainer();
>
> CONNECTION_FACTORY.afterPropertiesSet();
> LISTENER_CONTAINER.setConnectionFactory(CONNECTION_FACTORY);
> LISTENER_CONTAINER.afterPropertiesSet();
>
> RedisTemplate<String, String> redisTemplate = new RedisTemplate<String, String>();
> redisTemplate.setConnectionFactory(CONNECTION_FACTORY);
> redisTemplate.afterPropertiesSet();
>
>
> SimpleRegistry registry = new SimpleRegistry();
> registry.put("redisTemplate", redisTemplate);
> registry.put("listenerContainer", LISTENER_CONTAINER);
> // create CamelContext
> CamelContext context = new DefaultCamelContext(registry);
>
> context.addRoutes(new RouteBuilder() {
> @Override
> public void configure() throws Exception {
>
> from(
> "spring-redis://localhost:6379?command=SUBSCRIBE&channels=mychannel&listenerContainer=#listenerContainer")
> .process(new Processor() {
> @Override
> public void process(Exchange exchange) throws Exception {
> String res = exchange.getIn().getBody().toString();
> System.out.println("************ " + res);
> exchange.getOut().setBody(res);
> }
> })
> .to("log:foo");
> }
>
> });
> context.start();
>
> System.out.println("Press any key to shutdown.");
> BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
> br.readLine();
>
> context.stop();
> }
>
> }
>
>
> What is wrong my code above ?
> Thanks in advance for any assistance !
>
> Shing