You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Viral Gohel (JIRA)" <ji...@apache.org> on 2018/09/13 16:58:00 UTC

[jira] [Commented] (CAMEL-12620) CompletionAwareAggregationStrategy onCompletition method null exchange.

    [ https://issues.apache.org/jira/browse/CAMEL-12620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613780#comment-16613780 ] 

Viral Gohel commented on CAMEL-12620:
-------------------------------------

Hi [~luismospinam], Do you have a reproducer application which you can attach ? It will be helpful to narrow down the issue and  investigate.

> CompletionAwareAggregationStrategy onCompletition method null exchange.
> -----------------------------------------------------------------------
>
>                 Key: CAMEL-12620
>                 URL: https://issues.apache.org/jira/browse/CAMEL-12620
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>            Reporter: Luis Miguel
>            Priority: Major
>
> Hello.
>  
> We have a camel project to process some Csv files, we created a class implementing "CompletionAwareAggregationStrategy" in order to aggregate each row processed and we override the methods "aggregate" and "onCompletion" from it.
> The way we process the Csv file is parallelized with the "parallelProcessing" and indicating an "executorService" with 8 concurrently threads.
>  
> We are having a weird issue that rarely happens, and is that in the middle of the process of the file, the method "onCompletion" is being called (even when the file is not complete yet) and it sets the argument "Exchange" as NULL so the whole camel route is messed up.
> As I said this rarely happens when we try to reprocess the file the error is gone.
>  
>  
>  
> Here's the main route that process the CSV file, notice that for the split process we create an instance of "CsvAggregationStrategy"
>  
> {code:java}
> @Qualifier("executorServicePublicationItemCsv")
> @Autowired
> private ExecutorService executorService;
> @Override
> public void configure() throws Exception {
> String[] header = Arrays.stream(PublicationItemCsvFields.values())
> .map(PublicationItemCsvFields::getText).toArray(String[]::new);
> Map<String, String> csvFieldsByEntityAttribute = new HashMap<>();
> mapFields(csvFieldsByEntityAttribute);
> //@formatter:off
> from("file:publicationItemData?delete={{routes.push-csv-to-service.delete-source-file}}")
> .streamCaching()
> .routeId("push-publication-item-csv-to-service")
> .onException(Exception.class)
> .handled(true)
> .log(LoggingLevel.ERROR, "Error in publication item route, sending an email: ${exception.message} ${exception.stacktrace}")
> .to("direct:sendImportErrorReport")
> .end()
> .log(LoggingLevel.INFO, "Beginning to import publication item CSV: ${file:onlyname}")
> .unmarshal(new CsvDataFormat()
> .setSkipHeaderRecord(true)
> .setNullString(EMPTY)
> .setLazyLoad(true))
> .split(body(), new CsvAggregationStrategy())
> .streaming()
> .parallelProcessing().executorService(executorService)
> .to("direct:publication-item-splitter")
> .end()
> .choice()
> .when(simple("${exchangeProperty.aggregationError} != null"))
> .log("An error occurred when aggregating exchanges, sending an email with the error.")
> .setProperty("original_body", body())
> .to("direct:sendAggregationErrorEmail")
> .setBody(exchangeProperty("original_body")) 
> .end()
> .choice()
> .when(simple("${exchangeProperty.badCsvData.size()} > 0"))
> .setBody(simple("${exchangeProperty.badCsvData}"))
> .marshal(new CsvDataFormat().setHeader(header))
> .setProperty("badRowsBody").simple("${body}")
> .end()
> .choice()
> .when(simple("${exchangeProperty.successfulRecords.size()} > 0"))
> .setBody(simple("${exchangeProperty.successfulRecords}"))
> .marshal(new CsvDataFormat().setHeader(header))
> .setProperty("successfulRowsBody").simple("${body}")
> .end() 
> .to("direct:sendImportReport").end()
> .log("Completed import for publication item CSV: '${file:onlyname}'");
> from("direct:publication-item-splitter")
> .streamCaching()
> .routeId("push-publication-item-splitter")
> .onException(PublicationItemImportException.class)
> .handled(true)
> .log(LoggingLevel.ERROR, "Error importing publication item data: ${exception.message} ${exception.stacktrace}")
> .end()
> .onException(HttpHostConnectException.class)
> .handled(true)
> .log(LoggingLevel.ERROR, "Error connecting to publication item service host: ${exception.host}. Request body: ${body}")
> .end()
> .onException(HttpOperationFailedException.class)
> .handled(true)
> .log(LoggingLevel.ERROR, "Error received from publication item service: HTTP ${exception.statusCode}. Response body: ${exception.responseBody}. Request body: ${body}")
> .end()
> .onException(Exception.class)
> .handled(true)
> .log(LoggingLevel.ERROR, "Error: ${exception.message} ${exception.stacktrace}")
> .end()
> .setProperty("csvRowData").simple("${body}", List.class)
> .setProperty("csvFieldsByEntityAttribute").constant(csvFieldsByEntityAttribute)
> .bean(publicationItemCSVDataHandler)
> .marshal().json(JsonLibrary.Jackson)
> .setHeader(HttpHeaders.AUTHORIZATION, simple("Basic "+propertyServiceAuthorization))
> .log("Item ID: ${property.itemId}")
> .choice()
> .when().simple("${property.itemId} != null")
> .setHeader(Exchange.HTTP_PATH, simple("${property.itemId}"))
> .to("rest:PUT:items?host={{backend.event-service.host}}")
> .otherwise()
> .to("rest:POST:items?host={{backend.event-service.host}}")
> .end()
> .setProperty("responseId").jsonpath("$.id", true)
> .setProperty("idColumnPosition").constant(PublicationItemCsvFields.ID.getNumber())
> .choice()
> .when(exchangeProperty("responseId").isNull())
> .throwException(PublicationItemImportException.class, "Unexpected rest "
> + "response (no id returned)")
> .otherwise()
> .end();
> //@formatter:on
> }
> {code}
>  
> And this is the CsvAggregationStrategy, we're getting a null pointer Exception in the onCompletition method due to a null exchange.
>  
> {code:java}
> @Slf4j
> public class CsvAggregationStrategy implements CompletionAwareAggregationStrategy {
> @Override
> public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
> try {
> if (oldExchange == null) {
> oldExchange = newExchange;
> oldExchange.setProperty("badCsvData", new TreeMap<>()); // TreeMap ensures a sorted order
> oldExchange.setProperty("successfulRecords", new TreeMap<>());
> }
> if (newExchange.getProperty(Exchange.SPLIT_COMPLETE, boolean.class)) {
> oldExchange.setProperty("numberOfCSVRows", newExchange.getProperty(Exchange.SPLIT_SIZE));
> }
> Exception exception = newExchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
> if (exception != null) {
> @SuppressWarnings("unchecked")
> Map<Integer, Object> badCsvData = oldExchange.getProperty("badCsvData", Map.class);
> @SuppressWarnings("unchecked")
> List<String> csvRowData = newExchange.getProperty("csvRowData", List.class);
> if (exception instanceof HttpOperationFailedException) {
> Map<String, String> csvFieldsByEntityAttribute =
> newExchange.getProperty("csvFieldsByEntityAttribute", Map.class);
> String responseBody = ((HttpOperationFailedException) exception).getResponseBody();
> String errorMessage = getErrorMessage(responseBody, csvFieldsByEntityAttribute,
> ((HttpOperationFailedException) exception).getStatusCode());
> csvRowData.add(errorMessage);
> } else {
> csvRowData.add(exception.getMessage());
> }
> badCsvData.put(newExchange.getProperty(Exchange.SPLIT_INDEX, Integer.class), csvRowData);
> oldExchange.setProperty("badCsvData", badCsvData);
> }else {
> @SuppressWarnings("unchecked")
> Map<Integer, Object> sucessRecords = oldExchange.getProperty("successfulRecords", Map.class);
> @SuppressWarnings("unchecked")
> List<String> csvRowData = newExchange.getProperty("csvRowData", List.class);
> Integer idPosition = (Integer) newExchange.getProperty("idColumnPosition");
> if(idPosition != null) {
> csvRowData.set(idPosition, (String)newExchange.getProperty("responseId"));
> }else {
> csvRowData.add((String)newExchange.getProperty("responseId"));
> }
> sucessRecords.put(newExchange.getProperty(Exchange.SPLIT_INDEX, Integer.class), csvRowData);
> oldExchange.setProperty("successfulRecords", sucessRecords);
> }
> } catch(Exception e) {
> log.error("Error when trying to aggregate exchanges: " + e.getMessage(), e);
> if(oldExchange != null) {
> oldExchange.setProperty("aggregationError", ExceptionUtils.getStackTrace(e));
> }
> }
> return oldExchange;
> }
> @Override
> public void onCompletion(Exchange exchange) {
> @SuppressWarnings("unchecked")
> Map<Integer, List<String>> badCsvData = exchange.getProperty("badCsvData", Map.class);
> exchange.setProperty("badCsvData", new ArrayList<>(badCsvData.values()));
> @SuppressWarnings("unchecked")
> Map<Integer, List<String>> succesfulCsvData = exchange.getProperty("successfulRecords", Map.class);
> exchange.setProperty("successfulRecords", new ArrayList<>(succesfulCsvData.values()));
> /* Removing Exception/Failure properties if any occurred while processing the CSV rows. */
> exchange.removeProperties("CamelFailure*");
> exchange.removeProperties("CamelException*");
> exchange.removeProperties("CamelError*");
> }
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)