You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Ramon Rosa da Silva (JIRA)" <ji...@apache.org> on 2015/04/08 17:19:12 UTC

[jira] [Updated] (CAMEL-8604) [RabbitMQ] Allow Remote procedure call (RPC) in camel-rabbitmq

     [ https://issues.apache.org/jira/browse/CAMEL-8604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ramon Rosa da Silva updated CAMEL-8604:
---------------------------------------
    Description: 
I would like RabbitMQ component knew how to handle messages in PRC format more easily.
RabbitMQ PRC Sample: https://www.rabbitmq.com/tutorials/tutorial-six-java.html

Currently, to receive and reply in RPC is required (as server):
{code:title=RouteSample.java|borderStyle=solid}
from("rabbitmq://rabbit-server:5672/myExc?routingKey=myQueue&queue=myQueue&vhost=myVhost&autoAck=false&prefetchEnabled=true&prefetchCount=10&concurrentConsumers=10&declare=true")
.transacted ()
.setProperty (RabbitMQConstants.REPLY_TO) .header (RabbitMQConstants.REPLY_TO)
.to ("sql: ......")
.setHeader (RabbitMQConstants.ROUTING_KEY) .property (RabbitMQConstants.REPLY_TO)
.setHeader (RabbitMQConstants.EXCHANGE_NAME) .simple ("")
.to("rabbitmq://rabbit-server:5672/_required_fake_field?routingKey=_required_fake_field&vhost=myVhost&threadPoolSize=1");
{code}

So you must save "reply_to" from header to properties, set fake exchange and routing key, and overwrite with setHeader.

Would be simpler:
- The consumer save the reply_to in properties.
- Enable exchange and routing key blank.
- Use reply_to property as routhing key when exchange and key is blank.

Test unit for improvement:
{code:title=ProducerTest.java|borderStyle=solid}
    private RabbitMQEndpoint endpoint = Mockito.mock(RabbitMQEndpoint.class);
    private Exchange exchange = Mockito.mock(Exchange.class);
    private Message message = new DefaultMessage();
    private Connection conn = Mockito.mock(Connection.class);
    private Channel channel = Mockito.mock(Channel.class);;

    @Before
    public void before() throws IOException {
        Mockito.when(exchange.getIn()).thenReturn(message);
        Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn);
        Mockito.when(conn.createChannel()).thenReturn(null);
    }

       @Test
       public void testRpcReplyTo() throws Exception {             
             message = Mockito.mock(Message.class);
             Mockito.when(exchange.getIn()).thenReturn(message);         
             Mockito.when(message.getHeader(RabbitMQConstants.EXCHANGE_NAME, String.class)).thenReturn("");
             
             Mockito.when(conn.createChannel()).thenReturn(channel);
             RabbitMQProducer producer = new RabbitMQProducer(endpoint);
             exchange.setFromEndpoint(endpoint);
             exchange.setProperty(RabbitMQConstants.REPLY_TO, "queue_to_reply");
             
             producer.process(exchange);
             
             Mockito.verify(channel).basicPublish(Mockito.eq(""), Mockito.eq("queue_to_reply"), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
       }
       {code}


  was:
I would like RabbitMQ component knew how to handle messages in PRC format more easily.
RabbitMQ PRC Sample: https://www.rabbitmq.com/tutorials/tutorial-six-java.html

Currently, to receive and reply in RPC is required (as server):
{code:title=Bar.java|borderStyle=solid}
from("rabbitmq://rabbit-server:5672/myExc?routingKey=myQueue&queue=myQueue&vhost=myVhost&autoAck=false&prefetchEnabled=true&prefetchCount=10&concurrentConsumers=10&declare=true")
.transacted ()
.setProperty (RabbitMQConstants.REPLY_TO) .header (RabbitMQConstants.REPLY_TO)
.to ("sql: ......")
.setHeader (RabbitMQConstants.ROUTING_KEY) .property (RabbitMQConstants.REPLY_TO)
.setHeader (RabbitMQConstants.EXCHANGE_NAME) .simple ("")
.to("rabbitmq://rabbit-server:5672/_required_fake_field?routingKey=_required_fake_field&vhost=myVhost&threadPoolSize=1");
{code}

So you must save "reply_to" from header to properties, set fake exchange and routing key, and overwrite with setHeader.

Would be simpler:
- The consumer save the reply_to in properties.
- Enable exchange and routing key blank.
- Use reply_to property as routhing key when exchange and key is blank.

Test unit for improvement:
{code:title=Bar.java|borderStyle=solid}
    private RabbitMQEndpoint endpoint = Mockito.mock(RabbitMQEndpoint.class);
    private Exchange exchange = Mockito.mock(Exchange.class);
    private Message message = new DefaultMessage();
    private Connection conn = Mockito.mock(Connection.class);
    private Channel channel = Mockito.mock(Channel.class);;

    @Before
    public void before() throws IOException {
        Mockito.when(exchange.getIn()).thenReturn(message);
        Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn);
        Mockito.when(conn.createChannel()).thenReturn(null);
    }

       @Test
       public void testRpcReplyTo() throws Exception {             
             message = Mockito.mock(Message.class);
             Mockito.when(exchange.getIn()).thenReturn(message);         
             Mockito.when(message.getHeader(RabbitMQConstants.EXCHANGE_NAME, String.class)).thenReturn("");
             
             Mockito.when(conn.createChannel()).thenReturn(channel);
             RabbitMQProducer producer = new RabbitMQProducer(endpoint);
             exchange.setFromEndpoint(endpoint);
             exchange.setProperty(RabbitMQConstants.REPLY_TO, "queue_to_reply");
             
             producer.process(exchange);
             
             Mockito.verify(channel).basicPublish(Mockito.eq(""), Mockito.eq("queue_to_reply"), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
       }
       {code}



> [RabbitMQ] Allow Remote procedure call (RPC) in camel-rabbitmq
> --------------------------------------------------------------
>
>                 Key: CAMEL-8604
>                 URL: https://issues.apache.org/jira/browse/CAMEL-8604
>             Project: Camel
>          Issue Type: Wish
>          Components: camel-rabbitmq
>    Affects Versions: 2.15.1
>            Reporter: Ramon Rosa da Silva
>            Priority: Minor
>   Original Estimate: 12h
>  Remaining Estimate: 12h
>
> I would like RabbitMQ component knew how to handle messages in PRC format more easily.
> RabbitMQ PRC Sample: https://www.rabbitmq.com/tutorials/tutorial-six-java.html
> Currently, to receive and reply in RPC is required (as server):
> {code:title=RouteSample.java|borderStyle=solid}
> from("rabbitmq://rabbit-server:5672/myExc?routingKey=myQueue&queue=myQueue&vhost=myVhost&autoAck=false&prefetchEnabled=true&prefetchCount=10&concurrentConsumers=10&declare=true")
> .transacted ()
> .setProperty (RabbitMQConstants.REPLY_TO) .header (RabbitMQConstants.REPLY_TO)
> .to ("sql: ......")
> .setHeader (RabbitMQConstants.ROUTING_KEY) .property (RabbitMQConstants.REPLY_TO)
> .setHeader (RabbitMQConstants.EXCHANGE_NAME) .simple ("")
> .to("rabbitmq://rabbit-server:5672/_required_fake_field?routingKey=_required_fake_field&vhost=myVhost&threadPoolSize=1");
> {code}
> So you must save "reply_to" from header to properties, set fake exchange and routing key, and overwrite with setHeader.
> Would be simpler:
> - The consumer save the reply_to in properties.
> - Enable exchange and routing key blank.
> - Use reply_to property as routhing key when exchange and key is blank.
> Test unit for improvement:
> {code:title=ProducerTest.java|borderStyle=solid}
>     private RabbitMQEndpoint endpoint = Mockito.mock(RabbitMQEndpoint.class);
>     private Exchange exchange = Mockito.mock(Exchange.class);
>     private Message message = new DefaultMessage();
>     private Connection conn = Mockito.mock(Connection.class);
>     private Channel channel = Mockito.mock(Channel.class);;
>     @Before
>     public void before() throws IOException {
>         Mockito.when(exchange.getIn()).thenReturn(message);
>         Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn);
>         Mockito.when(conn.createChannel()).thenReturn(null);
>     }
>        @Test
>        public void testRpcReplyTo() throws Exception {             
>              message = Mockito.mock(Message.class);
>              Mockito.when(exchange.getIn()).thenReturn(message);         
>              Mockito.when(message.getHeader(RabbitMQConstants.EXCHANGE_NAME, String.class)).thenReturn("");
>              
>              Mockito.when(conn.createChannel()).thenReturn(channel);
>              RabbitMQProducer producer = new RabbitMQProducer(endpoint);
>              exchange.setFromEndpoint(endpoint);
>              exchange.setProperty(RabbitMQConstants.REPLY_TO, "queue_to_reply");
>              
>              producer.process(exchange);
>              
>              Mockito.verify(channel).basicPublish(Mockito.eq(""), Mockito.eq("queue_to_reply"), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
>        }
>        {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)