You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by atg roxx <at...@gmail.com> on 2014/07/08 17:36:07 UTC

onException is not working as expected.

Hi Team,

I am using multicast with AggreationStrategy an also using onException on
my route.

Happy case is working fine,

But when there is any exception, my exception processor handle it and set a
header and add error message in the body of the exchange.

When call go back to aggregator, it check whether the body of the exchange
received  is not null and of type Class A or not. if not then we know this
exchange is returned by the Exception handler. and we return that exchange.

When testing for exception , my mock end point does not receive exchange
set by the exception processor.

Kindly guide me where I am getting things wrong.:

Below is the source code:

Route definition :-------------------------------------

 onException(RecoverableException.class)
                .maximumRedeliveries(2)
                .handled(true)
                .beanRef("myCustomeExceptionHandler","handleException");


   from("{{route.direct.endpoint}}")
            .routeId(ROUTE_ID)
            .multicast(new MyAggregationStrategy())
                .parallelProcessing()
                .timeout(COMPLETION_TIMEOUT)
                .to(GET_DETAILS, GET_SUMMARY, GET_IMAGE)
            .end()
            .marshal(responseFormat);

        from(GET_SUMMARY)
            .choice()
                .when(summaryIsRequested)
                    .beanRef("myCustomeService", "getSummary")
            .otherwise()
                .setBody().constant(null)
                .log(LoggingLevel.DEBUG, "Skipping  summary - not
requested.")
            .end();

        from(GET_DETAILS)
            .choice()
                .when(detailsAreRequested)
                    .beanRef("myCustomeService", "getDetails")
            .otherwise()
                .setBody().constant(null)
                .log(LoggingLevel.DEBUG, "Skipping  details - not
requested.")
            .end();

        from(GET_IMAGE)
            .routeId("podImageRoute")
            .choice()
                .when(podImageIsRequested)
                    .beanRef("myCustomeService", "getImage")
            .otherwise()
                .setBody().constant(null)
                .log(LoggingLevel.DEBUG, "Skipping image - not requested.")
            .end();

Exception Handler
:------------------------------------------------------------------
public class HttpResponseExceptionHandler implements ExceptionHandler {
@Override
public void handleRecoverableException(Exchange exchange) {
Throwable cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT,
Exception.class);
String failureRouteId = exchange.getProperty(Exchange.FAILURE_ROUTE_ID,
String.class);
String errorMessage = cause.getMessage();
LOGGER.error("Exception occurred in the route {}. Exception details are:
{}", failureRouteId, errorMessage);
exchange.getIn().removeHeaders("*");
exchange.getIn().setHeader(Exchange.HTTP_RESPONSE_CODE, statusCode);
exchange.getIn().setBody(errorMessage);
}
}

Aggregator code: ----------------------------

public class MyCustomeAggregationStrategy implements AggregationStrategy {

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

        if (oldExchange == null) { // occurs for a new group
            return newExchange;
        }

        if (exchangeHasNonNullBody(oldExchange) &&
exchangeContainBodyofTypeClassA(oldExchange)) {
            oldBody=oldExchange.getIn().getBody();
            } else {
                return oldExchange; // assuming that body contain string
message set in Exception Handler
            }
        }

        if (exchangeHasNonNullBody(newExchange) &&
exchangeContainBodyofTypeClassA(newExchange)) {
            newBody=newExchange.getIn().getBody();
            } else {
                return newExchange; // assuming that body contain string
message set in Exception Handler
            }
        }

         MergeBodyofOldAndNewExchange(oldExchange,oldBody,newBody);

        return oldExchange;
    }

----------------------

Test case :---------------------


 @DirtiesContext
    @Test
    public void testExceptionHandeling() throws Exception {

        // Given
        when(myService.getImage(anyString(), anyString())).thenThrow(new
ConnectionException("Exception of type ConnectionException"));
        mockEndpointResult.expectedMessageCount(1);

        // When
        try {
            template.sendBody("{{route.direct.endpoint}}", request);
            Thread.sleep(1000); // wait a few moments...
            //then
            mockEndpointResult.assertIsSatisfied();  // fails here

        } catch (CamelExecutionException ex) {
            //then
           fail();
        }
    }


-- Regards,
atg roxx

Re: onException is not working as expected.

Posted by atg roxx <at...@gmail.com>.
could one one please have a look and reply to it.


On Tue, Jul 8, 2014 at 4:36 PM, atg roxx <at...@gmail.com> wrote:

> Hi Team,
>
> I am using multicast with AggreationStrategy an also using onException on
> my route.
>
> Happy case is working fine,
>
> But when there is any exception, my exception processor handle it and set
> a header and add error message in the body of the exchange.
>
> When call go back to aggregator, it check whether the body of the exchange
> received  is not null and of type Class A or not. if not then we know this
> exchange is returned by the Exception handler. and we return that exchange.
>
> When testing for exception , my mock end point does not receive exchange
> set by the exception processor.
>
> Kindly guide me where I am getting things wrong.:
>
> Below is the source code:
>
> Route definition :-------------------------------------
>
>  onException(RecoverableException.class)
>                 .maximumRedeliveries(2)
>                 .handled(true)
>                 .beanRef("myCustomeExceptionHandler","handleException");
>
>
>    from("{{route.direct.endpoint}}")
>             .routeId(ROUTE_ID)
>             .multicast(new MyAggregationStrategy())
>                 .parallelProcessing()
>                 .timeout(COMPLETION_TIMEOUT)
>                 .to(GET_DETAILS, GET_SUMMARY, GET_IMAGE)
>             .end()
>             .marshal(responseFormat);
>
>         from(GET_SUMMARY)
>             .choice()
>                 .when(summaryIsRequested)
>                     .beanRef("myCustomeService", "getSummary")
>             .otherwise()
>                 .setBody().constant(null)
>                 .log(LoggingLevel.DEBUG, "Skipping  summary - not
> requested.")
>             .end();
>
>         from(GET_DETAILS)
>             .choice()
>                 .when(detailsAreRequested)
>                     .beanRef("myCustomeService", "getDetails")
>             .otherwise()
>                 .setBody().constant(null)
>                 .log(LoggingLevel.DEBUG, "Skipping  details - not
> requested.")
>             .end();
>
>         from(GET_IMAGE)
>             .routeId("podImageRoute")
>             .choice()
>                 .when(podImageIsRequested)
>                     .beanRef("myCustomeService", "getImage")
>             .otherwise()
>                 .setBody().constant(null)
>                 .log(LoggingLevel.DEBUG, "Skipping image - not requested.")
>             .end();
>
> Exception Handler
> :------------------------------------------------------------------
> public class HttpResponseExceptionHandler implements ExceptionHandler {
>  @Override
> public void handleRecoverableException(Exchange exchange) {
> Throwable cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT,
> Exception.class);
>  String failureRouteId = exchange.getProperty(Exchange.FAILURE_ROUTE_ID,
> String.class);
> String errorMessage = cause.getMessage();
>  LOGGER.error("Exception occurred in the route {}. Exception details are:
> {}", failureRouteId, errorMessage);
> exchange.getIn().removeHeaders("*");
>  exchange.getIn().setHeader(Exchange.HTTP_RESPONSE_CODE, statusCode);
> exchange.getIn().setBody(errorMessage);
>  }
> }
>
> Aggregator code: ----------------------------
>
> public class MyCustomeAggregationStrategy implements AggregationStrategy {
>
>     @Override
>     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
>
>         if (oldExchange == null) { // occurs for a new group
>             return newExchange;
>         }
>
>         if (exchangeHasNonNullBody(oldExchange) &&
> exchangeContainBodyofTypeClassA(oldExchange)) {
>             oldBody=oldExchange.getIn().getBody();
>             } else {
>                 return oldExchange; // assuming that body contain string
> message set in Exception Handler
>             }
>         }
>
>         if (exchangeHasNonNullBody(newExchange) &&
> exchangeContainBodyofTypeClassA(newExchange)) {
>             newBody=newExchange.getIn().getBody();
>             } else {
>                 return newExchange; // assuming that body contain string
> message set in Exception Handler
>             }
>         }
>
>          MergeBodyofOldAndNewExchange(oldExchange,oldBody,newBody);
>
>         return oldExchange;
>     }
>
> ----------------------
>
> Test case :---------------------
>
>
>  @DirtiesContext
>     @Test
>     public void testExceptionHandeling() throws Exception {
>
>         // Given
>         when(myService.getImage(anyString(), anyString())).thenThrow(new
> ConnectionException("Exception of type ConnectionException"));
>         mockEndpointResult.expectedMessageCount(1);
>
>         // When
>         try {
>             template.sendBody("{{route.direct.endpoint}}", request);
>             Thread.sleep(1000); // wait a few moments...
>             //then
>             mockEndpointResult.assertIsSatisfied();  // fails here
>
>         } catch (CamelExecutionException ex) {
>             //then
>            fail();
>         }
>     }
>
>
> -- Regards,
> atg roxx
>
>

Re: onException is not working as expected.

Posted by atg roxx <at...@gmail.com>.
Hi Jiang,

The mock has been defined as :

 <bean id="myService" class="org.mockito.Mockito" factory-method="mock">
        <constructor-arg value="com.test.StrategicServiceProxy" />
    </bean>

-Regards,
Atg roxx


On Fri, Jul 11, 2014 at 1:03 PM, Willem Jiang <wi...@gmail.com>
wrote:

> Hi,
>
> It looks like you are using some kind of MOCK API to throw the exception.
> Can you show me the code how did you setup the mocked POJO bean?
> --
> Willem Jiang
>
> Red Hat, Inc.
> Web: http://www.redhat.com
> Blog: http://willemjiang.blogspot.com (English)
> http://jnn.iteye.com (Chinese)
> Twitter: willemjiang
> Weibo: 姜宁willem
>
>
>
> On July 8, 2014 at 11:36:38 PM, atg roxx (atgroxx@gmail.com) wrote:
> > Hi Team,
> >
> > I am using multicast with AggreationStrategy an also using onException on
> > my route.
> >
> > Happy case is working fine,
> >
> > But when there is any exception, my exception processor handle it and
> set a
> > header and add error message in the body of the exchange.
> >
> > When call go back to aggregator, it check whether the body of the
> exchange
> > received is not null and of type Class A or not. if not then we know this
> > exchange is returned by the Exception handler. and we return that
> exchange.
> >
> > When testing for exception , my mock end point does not receive exchange
> > set by the exception processor.
> >
> > Kindly guide me where I am getting things wrong.:
> >
> > Below is the source code:
> >
> > Route definition :-------------------------------------
> >
> > onException(RecoverableException.class)
> > .maximumRedeliveries(2)
> > .handled(true)
> > .beanRef("myCustomeExceptionHandler","handleException");
> >
> >
> > from("{{route.direct.endpoint}}")
> > .routeId(ROUTE_ID)
> > .multicast(new MyAggregationStrategy())
> > .parallelProcessing()
> > .timeout(COMPLETION_TIMEOUT)
> > .to(GET_DETAILS, GET_SUMMARY, GET_IMAGE)
> > .end()
> > .marshal(responseFormat);
> >
> > from(GET_SUMMARY)
> > .choice()
> > .when(summaryIsRequested)
> > .beanRef("myCustomeService", "getSummary")
> > .otherwise()
> > .setBody().constant(null)
> > .log(LoggingLevel.DEBUG, "Skipping summary - not
> > requested.")
> > .end();
> >
> > from(GET_DETAILS)
> > .choice()
> > .when(detailsAreRequested)
> > .beanRef("myCustomeService", "getDetails")
> > .otherwise()
> > .setBody().constant(null)
> > .log(LoggingLevel.DEBUG, "Skipping details - not
> > requested.")
> > .end();
> >
> > from(GET_IMAGE)
> > .routeId("podImageRoute")
> > .choice()
> > .when(podImageIsRequested)
> > .beanRef("myCustomeService", "getImage")
> > .otherwise()
> > .setBody().constant(null)
> > .log(LoggingLevel.DEBUG, "Skipping image - not requested.")
> > .end();
> >
> > Exception Handler
> > :------------------------------------------------------------------
> > public class HttpResponseExceptionHandler implements ExceptionHandler {
> > @Override
> > public void handleRecoverableException(Exchange exchange) {
> > Throwable cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT,
> > Exception.class);
> > String failureRouteId = exchange.getProperty(Exchange.FAILURE_ROUTE_ID,
> > String.class);
> > String errorMessage = cause.getMessage();
> > LOGGER.error("Exception occurred in the route {}. Exception details are:
> > {}", failureRouteId, errorMessage);
> > exchange.getIn().removeHeaders("*");
> > exchange.getIn().setHeader(Exchange.HTTP_RESPONSE_CODE, statusCode);
> > exchange.getIn().setBody(errorMessage);
> > }
> > }
> >
> > Aggregator code: ----------------------------
> >
> > public class MyCustomeAggregationStrategy implements AggregationStrategy
> {
> >
> > @Override
> > public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
> >
> > if (oldExchange == null) { // occurs for a new group
> > return newExchange;
> > }
> >
> > if (exchangeHasNonNullBody(oldExchange) &&
> > exchangeContainBodyofTypeClassA(oldExchange)) {
> > oldBody=oldExchange.getIn().getBody();
> > } else {
> > return oldExchange; // assuming that body contain string
> > message set in Exception Handler
> > }
> > }
> >
> > if (exchangeHasNonNullBody(newExchange) &&
> > exchangeContainBodyofTypeClassA(newExchange)) {
> > newBody=newExchange.getIn().getBody();
> > } else {
> > return newExchange; // assuming that body contain string
> > message set in Exception Handler
> > }
> > }
> >
> > MergeBodyofOldAndNewExchange(oldExchange,oldBody,newBody);
> >
> > return oldExchange;
> > }
> >
> > ----------------------
> >
> > Test case :---------------------
> >
> >
> > @DirtiesContext
> > @Test
> > public void testExceptionHandeling() throws Exception {
> >
> > // Given
> > when(myService.getImage(anyString(), anyString())).thenThrow(new
> > ConnectionException("Exception of type ConnectionException"));
> > mockEndpointResult.expectedMessageCount(1);
> >
> > // When
> > try {
> > template.sendBody("{{route.direct.endpoint}}", request);
> > Thread.sleep(1000); // wait a few moments...
> > //then
> > mockEndpointResult.assertIsSatisfied(); // fails here
> >
> > } catch (CamelExecutionException ex) {
> > //then
> > fail();
> > }
> > }
> >
> >
> > -- Regards,
> > atg roxx
> >
>
>

Re: onException is not working as expected.

Posted by Willem Jiang <wi...@gmail.com>.
Hi,

It looks like you are using some kind of MOCK API to throw the exception.
Can you show me the code how did you setup the mocked POJO bean?
--  
Willem Jiang

Red Hat, Inc.
Web: http://www.redhat.com
Blog: http://willemjiang.blogspot.com (English)
http://jnn.iteye.com (Chinese)
Twitter: willemjiang  
Weibo: 姜宁willem



On July 8, 2014 at 11:36:38 PM, atg roxx (atgroxx@gmail.com) wrote:
> Hi Team,
>  
> I am using multicast with AggreationStrategy an also using onException on
> my route.
>  
> Happy case is working fine,
>  
> But when there is any exception, my exception processor handle it and set a
> header and add error message in the body of the exchange.
>  
> When call go back to aggregator, it check whether the body of the exchange
> received is not null and of type Class A or not. if not then we know this
> exchange is returned by the Exception handler. and we return that exchange.
>  
> When testing for exception , my mock end point does not receive exchange
> set by the exception processor.
>  
> Kindly guide me where I am getting things wrong.:
>  
> Below is the source code:
>  
> Route definition :-------------------------------------
>  
> onException(RecoverableException.class)
> .maximumRedeliveries(2)
> .handled(true)
> .beanRef("myCustomeExceptionHandler","handleException");
>  
>  
> from("{{route.direct.endpoint}}")
> .routeId(ROUTE_ID)
> .multicast(new MyAggregationStrategy())
> .parallelProcessing()
> .timeout(COMPLETION_TIMEOUT)
> .to(GET_DETAILS, GET_SUMMARY, GET_IMAGE)
> .end()
> .marshal(responseFormat);
>  
> from(GET_SUMMARY)
> .choice()
> .when(summaryIsRequested)
> .beanRef("myCustomeService", "getSummary")
> .otherwise()
> .setBody().constant(null)
> .log(LoggingLevel.DEBUG, "Skipping summary - not
> requested.")
> .end();
>  
> from(GET_DETAILS)
> .choice()
> .when(detailsAreRequested)
> .beanRef("myCustomeService", "getDetails")
> .otherwise()
> .setBody().constant(null)
> .log(LoggingLevel.DEBUG, "Skipping details - not
> requested.")
> .end();
>  
> from(GET_IMAGE)
> .routeId("podImageRoute")
> .choice()
> .when(podImageIsRequested)
> .beanRef("myCustomeService", "getImage")
> .otherwise()
> .setBody().constant(null)
> .log(LoggingLevel.DEBUG, "Skipping image - not requested.")
> .end();
>  
> Exception Handler
> :------------------------------------------------------------------
> public class HttpResponseExceptionHandler implements ExceptionHandler {
> @Override
> public void handleRecoverableException(Exchange exchange) {
> Throwable cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT,
> Exception.class);
> String failureRouteId = exchange.getProperty(Exchange.FAILURE_ROUTE_ID,
> String.class);
> String errorMessage = cause.getMessage();
> LOGGER.error("Exception occurred in the route {}. Exception details are:
> {}", failureRouteId, errorMessage);
> exchange.getIn().removeHeaders("*");
> exchange.getIn().setHeader(Exchange.HTTP_RESPONSE_CODE, statusCode);
> exchange.getIn().setBody(errorMessage);
> }
> }
>  
> Aggregator code: ----------------------------
>  
> public class MyCustomeAggregationStrategy implements AggregationStrategy {
>  
> @Override
> public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
>  
> if (oldExchange == null) { // occurs for a new group
> return newExchange;
> }
>  
> if (exchangeHasNonNullBody(oldExchange) &&
> exchangeContainBodyofTypeClassA(oldExchange)) {
> oldBody=oldExchange.getIn().getBody();
> } else {
> return oldExchange; // assuming that body contain string
> message set in Exception Handler
> }
> }
>  
> if (exchangeHasNonNullBody(newExchange) &&
> exchangeContainBodyofTypeClassA(newExchange)) {
> newBody=newExchange.getIn().getBody();
> } else {
> return newExchange; // assuming that body contain string
> message set in Exception Handler
> }
> }
>  
> MergeBodyofOldAndNewExchange(oldExchange,oldBody,newBody);
>  
> return oldExchange;
> }
>  
> ----------------------
>  
> Test case :---------------------
>  
>  
> @DirtiesContext
> @Test
> public void testExceptionHandeling() throws Exception {
>  
> // Given
> when(myService.getImage(anyString(), anyString())).thenThrow(new
> ConnectionException("Exception of type ConnectionException"));
> mockEndpointResult.expectedMessageCount(1);
>  
> // When
> try {
> template.sendBody("{{route.direct.endpoint}}", request);
> Thread.sleep(1000); // wait a few moments...
> //then
> mockEndpointResult.assertIsSatisfied(); // fails here
>  
> } catch (CamelExecutionException ex) {
> //then
> fail();
> }
> }
>  
>  
> -- Regards,
> atg roxx
>